Class: Tochtli::ReplyQueue
- Inherits:
-
Object
- Object
- Tochtli::ReplyQueue
- Defined in:
- lib/tochtli/reply_queue.rb
Defined Under Namespace
Classes: Consumer
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #handle_reply(reply, correlation_id = nil) ⇒ Object
- #handle_timeout(original_message) ⇒ Object
-
#initialize(rabbit_connection, logger = nil) ⇒ ReplyQueue
constructor
A new instance of ReplyQueue.
- #name ⇒ Object
- #on_delivery(delivery_info, metadata, payload) ⇒ Object
- #reconnect(channel) ⇒ Object
- #register_message_handler(message, handler = nil, timeout = nil, &block) ⇒ Object
- #subscribe ⇒ Object
Constructor Details
#initialize(rabbit_connection, logger = nil) ⇒ ReplyQueue
Returns a new instance of ReplyQueue.
5 6 7 8 9 10 11 12 |
# File 'lib/tochtli/reply_queue.rb', line 5 def initialize(rabbit_connection, logger=nil) @connection = rabbit_connection @logger = logger || rabbit_connection.logger @message_handlers = {} @message_timeout_threads = {} subscribe end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
3 4 5 |
# File 'lib/tochtli/reply_queue.rb', line 3 def connection @connection end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
3 4 5 |
# File 'lib/tochtli/reply_queue.rb', line 3 def logger @logger end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
3 4 5 |
# File 'lib/tochtli/reply_queue.rb', line 3 def queue @queue end |
Instance Method Details
#handle_reply(reply, correlation_id = nil) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tochtli/reply_queue.rb', line 73 def handle_reply(reply, correlation_id=nil) correlation_id ||= reply.properties.correlation_id if reply.is_a?(Tochtli::Message) raise ArgumentError, "Correlated message ID expected" unless correlation_id if (handler = @message_handlers.delete(correlation_id)) if (timeout_thread = @message_timeout_threads.delete(correlation_id)) timeout_thread.kill timeout_thread.join # make sure timeout thread is dead end if !reply.is_a?(Tochtli::ErrorMessage) && !reply.is_a?(Exception) begin handler.call(reply) rescue Exception logger.error $! logger.error $!.backtrace.join("\n") handler.on_error($!) end else handler.on_error(reply) end else logger.error "[Tochtli::ReplyQueue] Unexpected message delivery '#{correlation_id}':\n\t#{reply.inspect})" end end |
#handle_timeout(original_message) ⇒ Object
103 104 105 106 107 108 109 110 |
# File 'lib/tochtli/reply_queue.rb', line 103 def handle_timeout() if (handler = @message_handlers.delete(.id)) @message_timeout_threads.delete(.id) handler.on_timeout else raise "Internal error, timeout handler not found for message: #{.id}, #{.inspect}" end end |
#name ⇒ Object
14 15 16 |
# File 'lib/tochtli/reply_queue.rb', line 14 def name @queue.name end |
#on_delivery(delivery_info, metadata, payload) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/tochtli/reply_queue.rb', line 57 def on_delivery(delivery_info, , payload) class_name = .type.camelize.gsub(/[^a-zA-Z0-9\:]/, '_') # basic sanity reply_class = eval(class_name) reply = reply_class.new({}, ) attributes = JSON.parse(payload) reply.attributes = attributes logger.debug "[#{Time.now} AMQP] Replay for #{reply.properties.correlation_id}: #{reply.inspect}" handle_reply reply rescue Exception logger.error $! logger.error $!.backtrace.join("\n") end |
#reconnect(channel) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/tochtli/reply_queue.rb', line 32 def reconnect(channel) if @queue channel.connection.logger.debug "Recovering reply queue binding (original: #{@original_queue_name}, current: #{@queue.name})" # Re-bind queue after name change (auto-generated new on server has been re-generated) exchange = @connection.create_exchange(channel) @queue.unbind exchange, routing_key: @original_queue_name @queue.bind exchange, routing_key: @queue.name end @original_queue_name = @queue.name end |
#register_message_handler(message, handler = nil, timeout = nil, &block) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/tochtli/reply_queue.rb', line 45 def (, handler=nil, timeout=nil, &block) @message_handlers[.id] = handler || block if timeout timeout_thread = Thread.start do sleep timeout logger.warn "[#{Time.now} AMQP] TIMEOUT on message '#{.id}' timeout: #{timeout}" handle_timeout end @message_timeout_threads[.id] = timeout_thread end end |
#subscribe ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/tochtli/reply_queue.rb', line 18 def subscribe channel = @connection.channel exchange = @connection.exchange @queue = channel.queue('', exclusive: true, auto_delete: true) @original_queue_name = @queue.name @queue.bind exchange, routing_key: @queue.name @consumer = Consumer.new(self, channel, @queue) @consumer.on_delivery(&method(:on_delivery)) @queue.subscribe_with(@consumer) end |