Class: Tochtli::ReplyQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/tochtli/reply_queue.rb

Defined Under Namespace

Classes: Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rabbit_connection, logger = nil) ⇒ ReplyQueue

Returns a new instance of ReplyQueue.



5
6
7
8
9
10
11
12
# File 'lib/tochtli/reply_queue.rb', line 5

def initialize(rabbit_connection, logger=nil)
  @connection              = rabbit_connection
  @logger                  = logger || rabbit_connection.logger
  @message_handlers        = {}
  @message_timeout_threads = {}

  subscribe
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/tochtli/reply_queue.rb', line 3

def connection
  @connection
end

#loggerObject (readonly)

Returns the value of attribute logger.



3
4
5
# File 'lib/tochtli/reply_queue.rb', line 3

def logger
  @logger
end

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/tochtli/reply_queue.rb', line 3

def queue
  @queue
end

Instance Method Details

#handle_reply(reply, correlation_id = nil) ⇒ Object

Raises:

  • (ArgumentError)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tochtli/reply_queue.rb', line 73

def handle_reply(reply, correlation_id=nil)
  correlation_id ||= reply.properties.correlation_id if reply.is_a?(Tochtli::Message)
  raise ArgumentError, "Correlated message ID expected" unless correlation_id
  if (handler = @message_handlers.delete(correlation_id))
    if (timeout_thread = @message_timeout_threads.delete(correlation_id))
      timeout_thread.kill
      timeout_thread.join # make sure timeout thread is dead
    end

    if !reply.is_a?(Tochtli::ErrorMessage) && !reply.is_a?(Exception)

      begin

        handler.call(reply)

      rescue Exception
        logger.error $!
        logger.error $!.backtrace.join("\n")
        handler.on_error($!)
      end

    else
      handler.on_error(reply)
    end

  else
    logger.error "[Tochtli::ReplyQueue] Unexpected message delivery '#{correlation_id}':\n\t#{reply.inspect})"
  end
end

#handle_timeout(original_message) ⇒ Object



103
104
105
106
107
108
109
110
# File 'lib/tochtli/reply_queue.rb', line 103

def handle_timeout(original_message)
  if (handler = @message_handlers.delete(original_message.id))
    @message_timeout_threads.delete(original_message.id)
    handler.on_timeout original_message
  else
    raise "Internal error, timeout handler not found for message: #{original_message.id}, #{original_message.inspect}"
  end
end

#nameObject



14
15
16
# File 'lib/tochtli/reply_queue.rb', line 14

def name
  @queue.name
end

#on_delivery(delivery_info, metadata, payload) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/tochtli/reply_queue.rb', line 57

def on_delivery(delivery_info, , payload)
  class_name       = .type.camelize.gsub(/[^a-zA-Z0-9\:]/, '_') # basic sanity
  reply_class      = eval(class_name)
  reply            = reply_class.new({}, )
  attributes       = JSON.parse(payload)
  reply.attributes = attributes

  logger.debug "[#{Time.now} AMQP] Replay for #{reply.properties.correlation_id}: #{reply.inspect}"

  handle_reply reply

rescue Exception
  logger.error $!
  logger.error $!.backtrace.join("\n")
end

#reconnect(channel) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/tochtli/reply_queue.rb', line 32

def reconnect(channel)
  if @queue
    channel.connection.logger.debug "Recovering reply queue binding (original: #{@original_queue_name}, current: #{@queue.name})"

    # Re-bind queue after name change (auto-generated new on server has been re-generated)
    exchange = @connection.create_exchange(channel)
    @queue.unbind exchange, routing_key: @original_queue_name
    @queue.bind exchange, routing_key: @queue.name
  end

  @original_queue_name = @queue.name
end

#register_message_handler(message, handler = nil, timeout = nil, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/tochtli/reply_queue.rb', line 45

def register_message_handler(message, handler=nil, timeout=nil, &block)
  @message_handlers[message.id] = handler || block
  if timeout
    timeout_thread                       = Thread.start do
      sleep timeout
      logger.warn "[#{Time.now} AMQP] TIMEOUT on message '#{message.id}' timeout: #{timeout}"
      handle_timeout message
    end
    @message_timeout_threads[message.id] = timeout_thread
  end
end

#subscribeObject



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/tochtli/reply_queue.rb', line 18

def subscribe
  channel  = @connection.channel
  exchange = @connection.exchange

  @queue               = channel.queue('', exclusive: true, auto_delete: true)
  @original_queue_name = @queue.name
  @queue.bind exchange, routing_key: @queue.name

  @consumer = Consumer.new(self, channel, @queue)
  @consumer.on_delivery(&method(:on_delivery))

  @queue.subscribe_with(@consumer)
end