Class: EventQ::RabbitMq::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq_rabbitmq/rabbitmq_queue_manager.rb

Constant Summary collapse

X_DEAD_LETTER_EXCHANGE =
'x-dead-letter-exchange'.freeze
X_MESSAGE_TTL =
'x-message-ttl'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueueManager

Returns a new instance of QueueManager.



10
11
12
13
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 10

def initialize
  @event_raised_exchange = EventQ::EventRaisedExchange.new
  @durable = true
end

Instance Attribute Details

#durableObject

Returns the value of attribute durable.



8
9
10
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 8

def durable
  @durable
end

Instance Method Details

#create_delay_queue(channel, queue, dlx_name, delay = 0) ⇒ Object



92
93
94
95
96
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 92

def create_delay_queue(channel, queue, dlx_name, delay=0)
  queue_name = EventQ.create_queue_name(queue.name)
  channel.queue("#{queue_name}.#{delay}.delay", durable: @durable,
                arguments: { X_DEAD_LETTER_EXCHANGE => dlx_name, X_MESSAGE_TTL => delay * 1000 })
end

#get_delay_exchange(channel, queue, delay) ⇒ Object



66
67
68
69
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 66

def get_delay_exchange(channel, queue, delay)
  _queue_name = EventQ.create_queue_name(queue.name)
  channel.direct("#{_queue_name}.#{delay}.d.ex")
end

#get_exchange(channel, exchange) ⇒ Object



98
99
100
101
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 98

def get_exchange(channel, exchange)
  _exchange_name = EventQ.create_exchange_name(exchange.name)
  return channel.direct(_exchange_name, :durable => @durable)
end

#get_queue(channel, queue) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 15

def get_queue(channel, queue)

  _queue_name = EventQ.create_queue_name(queue.name)

  #get/create the queue
  q = channel.queue(_queue_name, :durable => @durable)

  if queue.allow_retry
    retry_exchange = get_retry_exchange(channel, queue)
    subscriber_exchange = get_subscriber_exchange(channel, queue)

    retry_queue = get_retry_queue(channel, queue)
    retry_queue.bind(retry_exchange)

    q.bind(subscriber_exchange)
  end

  return q
end

#get_queue_exchange(channel, queue) ⇒ Object



51
52
53
54
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 51

def get_queue_exchange(channel, queue)
  _exchange_name = EventQ.create_exchange_name(queue.name)
  channel.fanout("#{_exchange_name}.ex")
end

#get_retry_exchange(channel, queue) ⇒ Object



56
57
58
59
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 56

def get_retry_exchange(channel, queue)
  _queue_name = EventQ.create_queue_name(queue.name)
  return channel.fanout("#{_queue_name}.r.ex")
end

#get_retry_queue(channel, queue) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 71

def get_retry_queue(channel, queue)
  subscriber_exchange = get_subscriber_exchange(channel, queue)

  _queue_name = EventQ.create_queue_name(queue.name)

  if queue.allow_retry_back_off == true

    EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.max_retry_delay}" }

    return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.max_retry_delay })

  else

    EventQ.logger.debug { "[#{self.class}] - Requesting retry queue. x-dead-letter-exchange: #{subscriber_exchange.name} | x-message-ttl: #{queue.retry_delay}" }

    return channel.queue("#{_queue_name}.r", :durable => @durable, :arguments => { X_DEAD_LETTER_EXCHANGE => subscriber_exchange.name, X_MESSAGE_TTL => queue.retry_delay })

  end

end

#get_subscriber_exchange(channel, queue) ⇒ Object



61
62
63
64
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 61

def get_subscriber_exchange(channel, queue)
  _queue_name = EventQ.create_queue_name(queue.name)
  return channel.fanout("#{_queue_name}.ex")
end

#pop_message(queue:) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 35

def pop_message(queue:)
  if RUBY_PLATFORM =~ /java/
    headers, payload = queue.pop({ :ack => true, :block => true })
    if headers == nil
      return [nil,nil]
    end
    [headers.delivery_tag, payload]
  else
    headers, properties, payload = queue.pop({ :manual_ack => true, :block => true })
    if headers == nil
      return [nil,nil]
    end
    [headers.delivery_tag, payload]
  end
end