Class: GovukMessageQueueConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/govuk_message_queue_consumer/consumer.rb

Direct Known Subclasses

TestConsumer

Constant Summary collapse

NUMBER_OF_MESSAGES_TO_PREFETCH =

Only fetch one message at a time on the channel.

By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.

1

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, exchange_name:, processor:, routing_key: '#') ⇒ Consumer

Returns a new instance of Consumer.



14
15
16
17
18
19
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 14

def initialize(queue_name:, exchange_name:, processor:, routing_key: '#')
  @queue_name = queue_name
  @exchange_name = exchange_name
  @processor = processor
  @routing_key = routing_key
end

Instance Method Details

#runObject



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 21

def run
  queue.subscribe(block: true, manual_ack: true) do |delivery_info, headers, payload|
    begin
      message = Message.new(payload, headers, delivery_info)
      processor_chain.process(message)
    rescue Exception => e
      $stderr.puts "rabbitmq_consumer: aborting due to unhandled exception in processor #{e.class}: #{e.message}"
      exit(1) # ensure rabbitmq requeues outstanding messages
    end
  end
end