Class: Henchman::Worker
- Inherits:
-
Object
- Object
- Henchman::Worker
- Defined in:
- lib/henchman/worker.rb
Overview
A class that handles incoming messages.
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#block ⇒ Object
- Proc
-
the Proc handling the messages for this Worker.
-
#consumer ⇒ Object
- AMQP::Consumer
-
the consumer feeding this Worker with messages.
-
#queue_name ⇒ Object
- String
-
the name of the queue this Worker listens to.
Instance Method Summary collapse
-
#call(message, headers = nil) ⇒ Henchman::Worker::Task
Call this worker with some data.
-
#consume! ⇒ Object
Make this Worker subscribe to a direct exchange.
-
#initialize(queue_name, &block) ⇒ Worker
constructor
A new instance of Worker.
-
#subscribe! ⇒ Object
Make this Worker subscribe to a fanout exchange.
-
#subscribe_to(queue, deferrable) ⇒ Object
Subscribe this Worker to a queue.
-
#unsubscribe! ⇒ Object
Unsubscribe this Worker from its queue.
Constructor Details
#initialize(queue_name, &block) ⇒ Worker
Returns a new instance of Worker.
105 106 107 108 |
# File 'lib/henchman/worker.rb', line 105 def initialize(queue_name, &block) @block = block @queue_name = queue_name end |
Instance Attribute Details
#block ⇒ Object
- Proc
-
the Proc handling the messages for this Henchman::Worker.
98 99 100 |
# File 'lib/henchman/worker.rb', line 98 def block @block end |
#consumer ⇒ Object
- AMQP::Consumer
-
the consumer feeding this Henchman::Worker with messages.
93 94 95 |
# File 'lib/henchman/worker.rb', line 93 def consumer @consumer end |
#queue_name ⇒ Object
- String
-
the name of the queue this Henchman::Worker listens to.
88 89 90 |
# File 'lib/henchman/worker.rb', line 88 def queue_name @queue_name end |
Instance Method Details
#call(message, headers = nil) ⇒ Henchman::Worker::Task
Call this worker with some data.
169 170 171 |
# File 'lib/henchman/worker.rb', line 169 def call(, headers = nil) Task.new(self, headers, ).call end |
#consume! ⇒ Object
Make this Henchman::Worker subscribe to a direct exchange.
153 154 155 156 157 158 159 |
# File 'lib/henchman/worker.rb', line 153 def consume! deferrable = EM::DefaultDeferrable.new Henchman.with_queue(queue_name) do |queue| subscribe_to(queue, deferrable) end EM::Synchrony.sync deferrable end |
#subscribe! ⇒ Object
Make this Henchman::Worker subscribe to a fanout exchange.
142 143 144 145 146 147 148 |
# File 'lib/henchman/worker.rb', line 142 def subscribe! deferrable = EM::DefaultDeferrable.new Henchman.with_fanout_queue(queue_name) do |queue| subscribe_to(queue, deferrable) end EM::Synchrony.sync deferrable end |
#subscribe_to(queue, deferrable) ⇒ Object
Subscribe this Henchman::Worker to a queue.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/henchman/worker.rb', line 116 def subscribe_to(queue, deferrable) Henchman.with_channel do |channel| @consumer = AMQP::Consumer.new(channel, queue, queue.generate_consumer_tag(queue.name), # consumer_tag false, # exclusive false) # no_ack consumer.on_delivery do |headers, data| if queue.channel.status == :opened begin call(MultiJson.decode(data), headers) rescue Exception => e STDERR.puts e STDERR.puts e.backtrace.join("\n") end end end consumer.consume do deferrable.set_deferred_status :succeeded end end end |
#unsubscribe! ⇒ Object
Unsubscribe this Henchman::Worker from its queue.
176 177 178 179 180 181 182 183 184 |
# File 'lib/henchman/worker.rb', line 176 def unsubscribe! deferrable = EM::DefaultDeferrable.new consumer.cancel do deferrable.set_deferred_status :succeeded end Fiber.new do EM::Synchrony.sync deferrable end.resume end |