Module: ZQ::Orchestra
- Included in:
- Echo
- Defined in:
- lib/zq/orchestra.rb
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
Instance Method Summary collapse
- #compose(item) ⇒ Object
- #initialize ⇒ Object
- #process_forever(interval = 1) ⇒ Object
- #process_until_exhausted ⇒ Object
- #process_with_interval(interval) ⇒ Object
Class Method Details
.included(base) ⇒ Object
49 50 51 52 |
# File 'lib/zq/orchestra.rb', line 49 def self.included(base) ::ZQ.autoregister_orchestra(base) base.extend ClassMethods end |
Instance Method Details
#compose(item) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/zq/orchestra.rb', line 106 def compose(item) composite = nil begin @composers.each do |c| composite = c.compose item, composite end rescue raise unless self.class.ignore_errors? end end |
#initialize ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/zq/orchestra.rb', line 87 def initialize @source, @composers = [:@source, :@composers].map do |m| self.class.instance_variable_get(m) end fail NoSourceProvided unless @source fail NoComposerProvided unless @composers end |
#process_forever(interval = 1) ⇒ Object
95 96 97 98 99 |
# File 'lib/zq/orchestra.rb', line 95 def process_forever(interval = 1) loop do process_with_interval(interval) end end |
#process_until_exhausted ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/zq/orchestra.rb', line 117 def process_until_exhausted this = self catch :exhausted do loop do if @source.transactional? @source.transaction do |item| throw :exhausted if item.nil? this.compose(item) end else item = @source.read_next throw :exhausted if item.nil? compose(item) end end end end |
#process_with_interval(interval) ⇒ Object
101 102 103 104 |
# File 'lib/zq/orchestra.rb', line 101 def process_with_interval(interval) process_until_exhausted Kernel.sleep(interval) end |