Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Direct Known Subclasses
Constant Summary collapse
- NUMBER_OF_MESSAGES_TO_PREFETCH =
Only fetch one message at a time on the channel.
By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.
1
Instance Method Summary collapse
-
#initialize(queue_name:, exchange_name:, processor:, routing_key: '#', statsd_client: NullStatsd.new) ⇒ Consumer
constructor
Create a new consumer.
- #run ⇒ Object
Constructor Details
#initialize(queue_name:, exchange_name:, processor:, routing_key: '#', statsd_client: NullStatsd.new) ⇒ Consumer
Create a new consumer
22 23 24 25 26 27 28 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 22 def initialize(queue_name:, exchange_name:, processor:, routing_key: '#', statsd_client: NullStatsd.new) @queue_name = queue_name @exchange_name = exchange_name @processor = processor @routing_key = routing_key @statsd_client = statsd_client end |
Instance Method Details
#run ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 30 def run queue.subscribe(block: true, manual_ack: true) do |delivery_info, headers, payload| begin = Message.new(payload, headers, delivery_info) @statsd_client.increment("#{@queue_name}.started") processor_chain.process() @statsd_client.increment("#{@queue_name}.#{message.status}") rescue Exception => e @statsd_client.increment("#{@queue_name}.uncaught_exception") Airbrake.notify_or_ignore(e) if defined?(Airbrake) $stderr.puts "Uncaught exception in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages end end end |