Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Direct Known Subclasses
Constant Summary collapse
- NUMBER_OF_MESSAGES_TO_PREFETCH =
Only fetch one message at a time on the channel.
By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.
1
Instance Method Summary collapse
-
#initialize(queue_name:, exchange_name:, processor:, routing_key: '#') ⇒ Consumer
constructor
A new instance of Consumer.
- #run ⇒ Object
Constructor Details
#initialize(queue_name:, exchange_name:, processor:, routing_key: '#') ⇒ Consumer
Returns a new instance of Consumer.
14 15 16 17 18 19 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 14 def initialize(queue_name:, exchange_name:, processor:, routing_key: '#') @queue_name = queue_name @exchange_name = exchange_name @processor = processor @routing_key = routing_key end |
Instance Method Details
#run ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 21 def run queue.subscribe(block: true, manual_ack: true) do |delivery_info, headers, payload| begin = Message.new(payload, headers, delivery_info) processor_chain.process() rescue Exception => e $stderr.puts "rabbitmq_consumer: aborting due to unhandled exception in processor #{e.class}: #{e.message}" exit(1) # ensure rabbitmq requeues outstanding messages end end end |