Class: WhoCan::Pinger
- Inherits:
-
Object
- Object
- WhoCan::Pinger
- Includes:
- Deferred::Accessors, Logging
- Defined in:
- lib/who_can/pinger.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Class Method Summary collapse
Instance Method Summary collapse
- #handle_response(header, payload) ⇒ Object
-
#initialize(connection) ⇒ Pinger
constructor
A new instance of Pinger.
-
#ping!(exchange, timeout = 5, &callback) ⇒ Object
returns immediately, will call the given callback with the headers and payload of the “winning” message (the first ping back from the workers we receive).
- #start!(&blk) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(connection) ⇒ Pinger
Returns a new instance of Pinger.
14 15 16 17 18 |
# File 'lib/who_can/pinger.rb', line 14 def initialize(connection) @connection = connection @needs_reply = {} @queue = nil end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
8 9 10 |
# File 'lib/who_can/pinger.rb', line 8 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/who_can/pinger.rb', line 8 def connection @connection end |
Class Method Details
.create_msg_id ⇒ Object
10 11 12 |
# File 'lib/who_can/pinger.rb', line 10 def self.create_msg_id "who_can_#{UUIDTools::UUID.random_create.to_s.tr('-', '_')}" end |
Instance Method Details
#handle_response(header, payload) ⇒ Object
73 74 75 76 77 78 79 80 |
# File 'lib/who_can/pinger.rb', line 73 def handle_response(header, payload) logger.debug {"handling a response (#{header.}) with a payload of: #{payload}"} if deferred = @needs_reply.delete(header.) EM.schedule do deferred.succeed(header, payload) end end end |
#ping!(exchange, timeout = 5, &callback) ⇒ Object
returns immediately, will call the given callback with the headers and payload of the “winning” message (the first ping back from the workers we receive).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/who_can/pinger.rb', line 51 def ping!(exchange, timeout=5, &callback) logger.debug { "sending a ping to #{exchange}" } deferred = Deferred::Default.new deferred.callback(&callback) deferred.timeout(timeout) raise "Channel not opened" unless channel.open? msg_id = self.class.create_msg_id @needs_reply[msg_id] = deferred deferred.errback_on_exception do logger.debug {"sending the ping to #{exchange} with a reply to of #{@queue.name}"} # XXX: the block given to publish here appears to be a bug w/ amqp gem, without it an exception is raised ping_exchange = channel.fanout(exchange) ping_exchange.publish('PING', :reply_to => @queue.name, :message_id => msg_id) { logger.debug { "actually sent the ping to #{exchange}" } } end deferred rescue Exception => e logger.error {"received an exception on ping!: #{e.to_std_format}"} deferred.fail(e) deferred end |
#start!(&blk) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/who_can/pinger.rb', line 21 def start!(&blk) EM.schedule do AMQP::Channel.new(connection) do |channel,_| @channel = channel logger.debug { "pinger channel is now open" } channel.on_error do |*a| logger.error { "channel on_error called with: #{a.inspect}" } end channel.queue('', :exclusive => true, :auto_delete => true) do |q| logger.debug { "queue opened: #{q.name} exclusive, auto_delete" } @queue = q block_callback = lambda do |*args| logger.debug {"calling back to with_queue"} on_start.succeed end logger.debug {"subscribing to the response queue"} @queue.subscribe(:confirm => block_callback, &method(:handle_response)) end end end on_start(&blk) end |