Class: EventQ::RabbitMq::QueueManager
- Inherits:
-
Object
- Object
- EventQ::RabbitMq::QueueManager
- Defined in:
- lib/eventq_rabbitmq/rabbitmq_queue_manager.rb
Constant Summary collapse
- X_DEAD_LETTER_EXCHANGE =
'x-dead-letter-exchange'.freeze
- X_MESSAGE_TTL =
'x-message-ttl'.freeze
Instance Attribute Summary collapse
-
#durable ⇒ Object
Returns the value of attribute durable.
Instance Method Summary collapse
- #create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object
- #get_delay_exchange(channel, queue, delay) ⇒ Object
- #get_exchange(channel, exchange) ⇒ Object
- #get_queue(channel, queue) ⇒ Object
- #get_queue_exchange(channel, queue) ⇒ Object
- #get_retry_exchange(channel, queue) ⇒ Object
- #get_retry_queue(channel, queue) ⇒ Object
- #get_subscriber_exchange(channel, queue) ⇒ Object
-
#initialize ⇒ QueueManager
constructor
A new instance of QueueManager.
- #pop_message(queue:) ⇒ Object
Constructor Details
#initialize ⇒ QueueManager
Returns a new instance of QueueManager.
10 11 12 13 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 10 def initialize @event_raised_exchange = EventQ::EventRaisedExchange.new @durable = true end |
Instance Attribute Details
#durable ⇒ Object
Returns the value of attribute durable.
8 9 10 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 8 def durable @durable end |
Instance Method Details
#create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object
92 93 94 95 96 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 92 def create_delay_queue(channel, queue, dlx_name, delay=0) queue_name = EventQ.create_queue_name(queue.name) channel.queue("#{queue_name}.#{delay}.delay", durable: @durable, arguments: { X_DEAD_LETTER_EXCHANGE => dlx_name, X_MESSAGE_TTL => delay * 1000 }) end |
#get_delay_exchange(channel, queue, delay) ⇒ Object
66 67 68 69 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 66 def get_delay_exchange(channel, queue, delay) _queue_name = EventQ.create_queue_name(queue.name) channel.direct("#{_queue_name}.#{delay}.d.ex") end |
#get_exchange(channel, exchange) ⇒ Object
98 99 100 101 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 98 def get_exchange(channel, exchange) _exchange_name = EventQ.create_exchange_name(exchange.name) return channel.direct(_exchange_name, :durable => @durable) end |
#get_queue(channel, queue) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 15 def get_queue(channel, queue) _queue_name = EventQ.create_queue_name(queue.name) #get/create the queue q = channel.queue(_queue_name, :durable => @durable) if queue.allow_retry retry_exchange = get_retry_exchange(channel, queue) subscriber_exchange = get_subscriber_exchange(channel, queue) retry_queue = get_retry_queue(channel, queue) retry_queue.bind(retry_exchange) q.bind(subscriber_exchange) end return q end |
#get_queue_exchange(channel, queue) ⇒ Object
51 52 53 54 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 51 def get_queue_exchange(channel, queue) _exchange_name = EventQ.create_exchange_name(queue.name) channel.fanout("#{_exchange_name}.ex") end |
#get_retry_exchange(channel, queue) ⇒ Object
56 57 58 59 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 56 def get_retry_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue.name) return channel.fanout("#{_queue_name}.r.ex") end |
#get_retry_queue(channel, queue) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 71 def get_retry_queue(channel, queue) subscriber_exchange = get_subscriber_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue.name) if queue.allow_retry_back_off == true EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.max_retry_delay}" } return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.max_retry_delay }) else EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.retry_delay}" } return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.retry_delay }) end end |
#get_subscriber_exchange(channel, queue) ⇒ Object
61 62 63 64 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 61 def get_subscriber_exchange(channel, queue) _queue_name = EventQ.create_queue_name(queue.name) return channel.fanout("#{_queue_name}.ex") end |
#pop_message(queue:) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 35 def (queue:) if RUBY_PLATFORM =~ /java/ headers, payload = queue.pop({ :ack => true, :block => true }) if headers == nil return [nil,nil] end [headers.delivery_tag, payload] else headers, properties, payload = queue.pop({ :manual_ack => true, :block => true }) if headers == nil return [nil,nil] end [headers.delivery_tag, payload] end end |