Module: ZQ::Orchestra

Included in:
Echo
Defined in:
lib/zq/orchestra.rb

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



49
50
51
52
# File 'lib/zq/orchestra.rb', line 49

def self.included(base)
  ::ZQ.autoregister_orchestra(base)
  base.extend ClassMethods
end

Instance Method Details

#compose(item) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/zq/orchestra.rb', line 106

def compose(item)
    composite = nil
    begin
      @composers.each do |c|
        composite = c.compose item, composite
      end
    rescue
      raise unless self.class.ignore_errors?
    end
end

#initializeObject



87
88
89
90
91
92
93
# File 'lib/zq/orchestra.rb', line 87

def initialize
  @source, @composers = [:@source, :@composers].map do |m|
    self.class.instance_variable_get(m)
  end
  fail NoSourceProvided unless @source
  fail NoComposerProvided unless @composers
end

#process_forever(interval = 1) ⇒ Object



95
96
97
98
99
# File 'lib/zq/orchestra.rb', line 95

def process_forever(interval = 1)
  loop do
    process_with_interval(interval)
  end
end

#process_until_exhaustedObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/zq/orchestra.rb', line 117

def process_until_exhausted
  this = self
  catch :exhausted do
    loop do
      if @source.transactional?
        @source.transaction do |item|
          throw :exhausted if item.nil?
          this.compose(item)
        end
      else
        item = @source.read_next
        throw :exhausted if item.nil?
        compose(item)
      end
    end
  end
end

#process_with_interval(interval) ⇒ Object



101
102
103
104
# File 'lib/zq/orchestra.rb', line 101

def process_with_interval(interval)
    process_until_exhausted
    Kernel.sleep(interval)
end