Module: Henchman

Extended by:
Henchman
Included in:
Henchman
Defined in:
lib/henchman.rb,
lib/henchman/worker.rb

Overview

Thin wrapper around AMQP

Defined Under Namespace

Classes: Worker

Constant Summary collapse

@@connection =
nil
@@channel =
nil
@@error_handler =
Proc.new do
  STDERR.puts("consume(#{queue_name.inspect}, #{headers.inspect}, #{message.inspect}): #{exception.message}")
  STDERR.puts(exception.backtrace.join("\n"))
end
@@logger =
Proc.new do |msg|
  puts msg
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.error_handlerProc



45
46
47
# File 'lib/henchman.rb', line 45

def self.error_handler
  @@error_handler
end

Instance Method Details

#aenqueue(queue_name, message, headers = {}) ⇒ EM::Deferrable

Enqueue a message asynchronously.



261
262
263
264
265
266
267
268
269
# File 'lib/henchman.rb', line 261

def aenqueue(queue_name, message, headers = {})
  deferrable = EM::DefaultDeferrable.new
  with_direct_exchange do |exchange|
    exchange.publish(MultiJson.encode(message), :headers => headers, :routing_key => queue_name) do
      deferrable.set_deferred_status :succeeded
    end
  end
  deferrable
end

#amqp_optionsHash

Will return the default options when connecting to the AMQP broker.

Uses the URL from #amqp_url to construct these options.



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/henchman.rb', line 81

def amqp_options
  uri = URI.parse(amqp_url)
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user || "guest",
    :port => uri.port || 5672,
    :pass => uri.password || "guest"
  }
rescue Object => e
  raise "invalid AMQP_URL: #{uri.inspect} (#{e})"
end

#amqp_urlString

Will return a URL to the AMQP broker to use. Will get this from the ENV variable AMQP_URL if present.



63
64
65
# File 'lib/henchman.rb', line 63

def amqp_url
  @amqp_url ||= (ENV["AMQP_URL"] || "amqp://localhost/")
end

#amqp_url=(url) ⇒ String



70
71
72
# File 'lib/henchman.rb', line 70

def amqp_url=(url)
  @amqp_url = url
end

#apublish(exchange_name, message, headers = {}) ⇒ EM::Deferrable

Publish a message to multiple consumers asynchronously.



288
289
290
291
292
293
294
295
296
# File 'lib/henchman.rb', line 288

def apublish(exchange_name, message, headers = {})
  deferrable = EM::DefaultDeferrable.new
  with_fanout_exchange(exchange_name) do |exchange|
    exchange.publish(MultiJson.encode(message), :headers => headers) do
      deferrable.set_deferred_status :succeeded
    end
  end
  deferrable
end

#channel_optionsHash

Will return the default options to use when creating channels.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.



128
129
130
131
132
133
# File 'lib/henchman.rb', line 128

def channel_options
  @channel_options ||= {
    :prefetch => 1,
    :auto_recovery => true
  }
end

#enqueue(queue_name, message, headers = {}) ⇒ Object

Enqueue a message synchronously.



250
251
252
# File 'lib/henchman.rb', line 250

def enqueue(queue_name, message, headers = {})
  EM::Synchrony.sync(aenqueue(queue_name, message, headers))
end

#error(&block) ⇒ Object

Define an error handler.



54
55
56
# File 'lib/henchman.rb', line 54

def error(&block)
  @@error_handler = block
end

#exchange_optionsHash

Will return the default options to use when creating exchanges.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.



115
116
117
118
119
# File 'lib/henchman.rb', line 115

def exchange_options
  @exchange_options ||= {
    :auto_delete => true
  }
end

#log(msg) ⇒ Object

Log a message.



39
40
# File 'lib/henchman.rb', line 39

def log(msg)
end

#logger(&block) ⇒ Object

Define a log handler.



30
31
32
# File 'lib/henchman.rb', line 30

def logger(&block)
  @@logger = block
end

#publish(exchange_name, message, headers = {}) ⇒ Object

Publish a a message to multiple consumers synchronously.



277
278
279
# File 'lib/henchman.rb', line 277

def publish(exchange_name, message, headers = {})
  EM::Synchrony.sync(apublish(exchange_name, message, headers))
end

#queue_optionsHash

Will return the default options to use when creating queues.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.



101
102
103
104
105
106
# File 'lib/henchman.rb', line 101

def queue_options
  @queue_options ||= {
    :durable => true,
    :auto_delete => true
  }
end

#stop!Object

Will stop and deactivate Henchman.



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/henchman.rb', line 138

def stop!
  with_channel do |channel|
    channel.close
  end
  @@channel = nil
  with_connection do |connection|
    connection.close
  end
  @@connection = nil
  AMQP.stop
end

#with_channel(&block) ⇒ Object

Will yield an open and ready channel.



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/henchman.rb', line 177

def with_channel(&block)
  with_connection do |connection|
    @@channel = AMQP::Channel.new(connection, channel_options) if @@channel.nil? || @@channel.status == :closed
    @@channel.on_error do |channel, channel_close|
      log("#{self} reinitializing #{channel} due to #{channel_close}")
      channel.reuse
    end
    @@channel.once_open do 
      yield @@channel
    end
  end
end

#with_connection(&block) ⇒ Object

Will yield an open and ready connection.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/henchman.rb', line 155

def with_connection(&block)
  @@connection = AMQP.connect(amqp_options) if @@connection.nil? || @@connection.status == :closed
  @@connection.on_tcp_connection_loss do
    log("#{self} reconnecting")
    @@connection.reconnect
  end
  @@connection.on_recovery do
    log("#{self} reconnected!")
  end 
  @@connection.on_error do |connection, connection_close|
    raise "#{connection}: #{connection_close.reply_text}"
  end
  @@connection.on_open do 
    yield @@connection
  end
end

#with_direct_exchange(&block) ⇒ Object

Will yield an open and ready direct exchange.



195
196
197
198
199
# File 'lib/henchman.rb', line 195

def with_direct_exchange(&block)
  with_channel do |channel|
    channel.direct(AMQ::Protocol::EMPTY_STRING, exchange_options, &block)
  end
end

#with_fanout_exchange(exchange_name, &block) ⇒ Object

Will yield an open and ready fanout exchange.



207
208
209
210
211
# File 'lib/henchman.rb', line 207

def with_fanout_exchange(exchange_name, &block)
  with_channel do |channel|
    channel.fanout(exchange_name, exchange_options, &block)
  end
end

#with_fanout_queue(exchange_name, &block) ⇒ Object

Will yield an open and ready queue bound to an open and ready fanout exchange.



219
220
221
222
223
224
225
226
227
228
229
# File 'lib/henchman.rb', line 219

def with_fanout_queue(exchange_name, &block)
  with_channel do |channel|
    with_fanout_exchange(exchange_name) do |exchange|
      channel.queue do |queue|
        queue.bind(exchange) do
          yield queue
        end
      end
    end
  end
end

#with_queue(queue_name, &block) ⇒ Object

Will yield an open and ready queue.



236
237
238
239
240
241
242
# File 'lib/henchman.rb', line 236

def with_queue(queue_name, &block)
  with_channel do |channel|
    channel.queue(queue_name, queue_options) do |queue|
      yield queue
    end
  end
end