Class: EventQ::RabbitMq::QueueManager
- Inherits:
-
Object
- Object
- EventQ::RabbitMq::QueueManager
- Defined in:
- lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb
Constant Summary collapse
- X_DEAD_LETTER_EXCHANGE =
'x-dead-letter-exchange'.freeze
- X_MESSAGE_TTL =
'x-message-ttl'.freeze
Instance Attribute Summary collapse
-
#durable ⇒ Object
Returns the value of attribute durable.
Instance Method Summary collapse
- #create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object
- #get_delay_exchange(channel, queue, delay) ⇒ Object
- #get_exchange(channel, exchange) ⇒ Object
- #get_queue(channel, queue) ⇒ Object
- #get_queue_exchange(channel, queue) ⇒ Object
- #get_retry_exchange(channel, queue) ⇒ Object
- #get_retry_queue(channel, queue) ⇒ Object
- #get_subscriber_exchange(channel, queue) ⇒ Object
-
#initialize ⇒ QueueManager
constructor
A new instance of QueueManager.
- #pop_message(queue:) ⇒ Object
Constructor Details
#initialize ⇒ QueueManager
Returns a new instance of QueueManager.
10 11 12 13 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 10 def initialize @event_raised_exchange = EventQ::EventRaisedExchange.new @durable = true end |
Instance Attribute Details
#durable ⇒ Object
Returns the value of attribute durable.
8 9 10 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 8 def durable @durable end |
Instance Method Details
#create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object
84 85 86 87 88 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 84 def create_delay_queue(channel, queue, dlx_name, delay=0) queue_name = EventQ.create_queue_name(queue) channel.queue("#{queue_name}.#{delay}.delay", durable: @durable, arguments: { X_DEAD_LETTER_EXCHANGE => dlx_name, X_MESSAGE_TTL => delay * 1000 }) end |
#get_delay_exchange(channel, queue, delay) ⇒ Object
58 59 60 61 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 58 def get_delay_exchange(channel, queue, delay) _queue_name = EventQ.create_queue_name(queue) channel.direct("#{_queue_name}.#{delay}.d.ex") end |
#get_exchange(channel, exchange) ⇒ Object
90 91 92 93 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 90 def get_exchange(channel, exchange) _exchange_name = EventQ.create_exchange_name(exchange) return channel.direct(_exchange_name, :durable => @durable) end |
#get_queue(channel, queue) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 15 def get_queue(channel, queue) _queue_name = EventQ.create_queue_name(queue) # get/create the queue q = channel.queue(_queue_name, :durable => @durable) subscriber_exchange = get_subscriber_exchange(channel, queue) if queue.allow_retry retry_exchange = get_retry_exchange(channel, queue) retry_queue = get_retry_queue(channel, queue) retry_queue.bind(retry_exchange) end q.bind(subscriber_exchange) return q end |
#get_queue_exchange(channel, queue) ⇒ Object
43 44 45 46 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 43 def get_queue_exchange(channel, queue) _exchange_name = EventQ.create_exchange_name(queue) channel.fanout("#{_exchange_name}.ex") end |
#get_retry_exchange(channel, queue) ⇒ Object
48 49 50 51 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 48 def get_retry_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue) return channel.fanout("#{_queue_name}.r.ex") end |
#get_retry_queue(channel, queue) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 63 def get_retry_queue(channel, queue) subscriber_exchange = get_subscriber_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue) if queue.allow_retry_back_off == true EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.max_retry_delay}" } return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.max_retry_delay }) else EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.retry_delay}" } return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.retry_delay }) end end |
#get_subscriber_exchange(channel, queue) ⇒ Object
53 54 55 56 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 53 def get_subscriber_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue) return channel.fanout("#{_queue_name}.ex") end |
#pop_message(queue:) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 35 def (queue:) headers, properties, payload = queue.pop({ :manual_ack => true, :block => true }) if headers == nil return [nil,nil] end [headers.delivery_tag, payload] end |