Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Defined Under Namespace
Classes: NullStatsd
Constant Summary collapse
- HANDLE_BATCHES =
false
- NUMBER_OF_MESSAGES_TO_PREFETCH =
Only fetch one message at a time on the channel.
By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.
1
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new($stderr)) ⇒ Consumer
constructor
Create a new consumer.
- #run(subscribe_opts: {}) ⇒ Object
Constructor Details
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new($stderr)) ⇒ Consumer
Create a new consumer
30 31 32 33 34 35 36 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 30 def initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new($stderr)) @queue_name = queue_name @processor = processor @rabbitmq_connection = rabbitmq_connection @statsd_client = statsd_client @logger = logger end |
Class Method Details
.default_connection_from_env ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 13 def self.default_connection_from_env # https://github.com/ruby-amqp/bunny/blob/066496d/docs/guides/connecting.md#paas-environments if !ENV["RABBITMQ_URL"].to_s.empty? Bunny.new else Bunny.new(RabbitMQConfig.from_environment(ENV)) end end |
Instance Method Details
#run(subscribe_opts: {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 38 def run(subscribe_opts: {}) @rabbitmq_connection.start subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts) queue.subscribe(subscribe_opts) do |delivery_info, headers, payload| = Message.new(payload, headers, delivery_info) @statsd_client.increment("#{@queue_name}.started") .process() @statsd_client.increment("#{@queue_name}.#{.status}") rescue SignalException => e @logger.error "SignalException in processor: \n\n #{e.class}: #{e.}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages rescue StandardError => e @statsd_client.increment("#{@queue_name}.uncaught_exception") GovukError.notify(e) if defined?(GovukError) @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages end end |