Class: Henchman::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/henchman/worker.rb

Overview

A class that handles incoming messages.

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, &block) ⇒ Worker

Returns a new instance of Worker.

Parameters:

  • queue_name (String)

    the name of the queue this worker listens to.

  • exchange_type (Symbol)

    the type of exchange this worker will connect its queue to.

  • block (Proc)

    the Proc that will handle the messages for this Henchman::Worker.



105
106
107
108
# File 'lib/henchman/worker.rb', line 105

def initialize(queue_name, &block)
  @block = block
  @queue_name = queue_name
end

Instance Attribute Details

#blockObject

Proc

the Proc handling the messages for this Henchman::Worker.



98
99
100
# File 'lib/henchman/worker.rb', line 98

def block
  @block
end

#consumerObject

AMQP::Consumer

the consumer feeding this Henchman::Worker with messages.



93
94
95
# File 'lib/henchman/worker.rb', line 93

def consumer
  @consumer
end

#queue_nameObject

String

the name of the queue this Henchman::Worker listens to.



88
89
90
# File 'lib/henchman/worker.rb', line 88

def queue_name
  @queue_name
end

Instance Method Details

#call(message, headers = nil) ⇒ Henchman::Worker::Task

Call this worker with some data.

Parameters:

  • headers (AMQP::Header) (defaults to: nil)

    the headers to handle.

  • message (Object)

    the message to handle.

Returns:



169
170
171
# File 'lib/henchman/worker.rb', line 169

def call(message, headers = nil)
  Task.new(self, headers, message).call
end

#consume!Object

Make this Henchman::Worker subscribe to a direct exchange.



153
154
155
156
157
158
159
# File 'lib/henchman/worker.rb', line 153

def consume!
  deferrable = EM::DefaultDeferrable.new
  Henchman.with_queue(queue_name) do |queue|
    subscribe_to(queue, deferrable)
  end
  EM::Synchrony.sync deferrable
end

#subscribe!Object

Make this Henchman::Worker subscribe to a fanout exchange.



142
143
144
145
146
147
148
# File 'lib/henchman/worker.rb', line 142

def subscribe!
  deferrable = EM::DefaultDeferrable.new
  Henchman.with_fanout_queue(queue_name) do |queue|
    subscribe_to(queue, deferrable)
  end
  EM::Synchrony.sync deferrable
end

#subscribe_to(queue, deferrable) ⇒ Object

Subscribe this Henchman::Worker to a queue.

Parameters:

  • queue (AMQP::Queue)

    the AMQP::Queue to subscribe the Henchman::Worker to.

  • deferrable (EM::Deferrable)

    an EM::Deferrable that will succeed with the subscription is done.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/henchman/worker.rb', line 116

def subscribe_to(queue, deferrable)
  Henchman.with_channel do |channel|
    @consumer = AMQP::Consumer.new(channel, 
                                  queue, 
                                  queue.generate_consumer_tag(queue.name), # consumer_tag
                                  false, # exclusive
                                  false) # no_ack
    consumer.on_delivery do |headers, data|
      if queue.channel.status == :opened
        begin
          call(MultiJson.decode(data), headers)
        rescue Exception => e
          STDERR.puts e
          STDERR.puts e.backtrace.join("\n")
        end
      end
    end
    consumer.consume do 
      deferrable.set_deferred_status :succeeded
    end
  end
end

#unsubscribe!Object

Unsubscribe this Henchman::Worker from its queue.



176
177
178
179
180
181
182
183
184
# File 'lib/henchman/worker.rb', line 176

def unsubscribe!
  deferrable = EM::DefaultDeferrable.new
  consumer.cancel do
    deferrable.set_deferred_status :succeeded
  end
  Fiber.new do
    EM::Synchrony.sync deferrable
  end.resume
end