Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Direct Known Subclasses
Defined Under Namespace
Classes: NullStatsd
Constant Summary collapse
- HANDLE_BATCHES =
false- NUMBER_OF_MESSAGES_TO_PREFETCH =
Only fetch one message at a time on the channel.
By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.
1
Instance Method Summary collapse
-
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new(STDERR)) ⇒ Consumer
constructor
Create a new consumer.
- #run ⇒ Object
Constructor Details
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new(STDERR)) ⇒ Consumer
Create a new consumer
21 22 23 24 25 26 27 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 21 def initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new(STDERR)) @queue_name = queue_name @processor = processor @rabbitmq_connection = rabbitmq_connection @statsd_client = statsd_client @logger = logger end |
Instance Method Details
#run ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 29 def run @rabbitmq_connection.start queue.subscribe(block: true, manual_ack: true) do |delivery_info, headers, payload| begin = Message.new(payload, headers, delivery_info) @statsd_client.increment("#{@queue_name}.started") .process() @statsd_client.increment("#{@queue_name}.#{.status}") rescue Exception => e @statsd_client.increment("#{@queue_name}.uncaught_exception") GovukError.notify(e) if defined?(GovukError) @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages end end end |