Class: EventQ::RabbitMq::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb

Constant Summary collapse

X_DEAD_LETTER_EXCHANGE =
'x-dead-letter-exchange'.freeze
X_MESSAGE_TTL =
'x-message-ttl'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueueManager

Returns a new instance of QueueManager.



10
11
12
13
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 10

def initialize
  @event_raised_exchange = EventQ::EventRaisedExchange.new
  @durable = true
end

Instance Attribute Details

#durableObject

Returns the value of attribute durable.



8
9
10
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 8

def durable
  @durable
end

Instance Method Details

#create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object



84
85
86
87
88
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 84

def create_delay_queue(channel, queue, dlx_name, delay=0)
  queue_name = EventQ.create_queue_name(queue)
  channel.queue("#{queue_name}.#{delay}.delay", durable: @durable,
                arguments: { X_DEAD_LETTER_EXCHANGE => dlx_name, X_MESSAGE_TTL => delay * 1000 })
end

#get_delay_exchange(channel, queue, delay) ⇒ Object



58
59
60
61
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 58

def get_delay_exchange(channel, queue, delay)
  _queue_name = EventQ.create_queue_name(queue)
  channel.direct("#{_queue_name}.#{delay}.d.ex")
end

#get_exchange(channel, exchange) ⇒ Object



90
91
92
93
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 90

def get_exchange(channel, exchange)
  _exchange_name = EventQ.create_exchange_name(exchange)
  return channel.direct(_exchange_name, :durable => @durable)
end

#get_queue(channel, queue) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 15

def get_queue(channel, queue)

  _queue_name = EventQ.create_queue_name(queue)

  # get/create the queue
  q = channel.queue(_queue_name, :durable => @durable)

  subscriber_exchange = get_subscriber_exchange(channel, queue)

  if queue.allow_retry
    retry_exchange = get_retry_exchange(channel, queue)
    retry_queue = get_retry_queue(channel, queue)
    retry_queue.bind(retry_exchange)
  end

  q.bind(subscriber_exchange)

  return q
end

#get_queue_exchange(channel, queue) ⇒ Object



43
44
45
46
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 43

def get_queue_exchange(channel, queue)
  _exchange_name = EventQ.create_exchange_name(queue)
  channel.fanout("#{_exchange_name}.ex")
end

#get_retry_exchange(channel, queue) ⇒ Object



48
49
50
51
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 48

def get_retry_exchange(channel, queue)
  _queue_name = EventQ.create_queue_name(queue)
  return channel.fanout("#{_queue_name}.r.ex")
end

#get_retry_queue(channel, queue) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 63

def get_retry_queue(channel, queue)
  subscriber_exchange = get_subscriber_exchange(channel, queue)

  _queue_name = EventQ.create_queue_name(queue)

  if queue.allow_retry_back_off == true

    EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.max_retry_delay}" }

    return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.max_retry_delay })

  else

    EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.retry_delay}" }

    return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.retry_delay })

  end

end

#get_subscriber_exchange(channel, queue) ⇒ Object



53
54
55
56
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 53

def get_subscriber_exchange(channel, queue)
  _queue_name = EventQ.create_queue_name(queue)
  return channel.fanout("#{_queue_name}.ex")
end

#pop_message(queue:) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 35

def pop_message(queue:)
  headers, properties, payload = queue.pop({ :manual_ack => true, :block => true })
  if headers == nil
    return [nil,nil]
  end
  [headers.delivery_tag, payload]
end