Class: GovukMessageQueueConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/govuk_message_queue_consumer/consumer.rb

Defined Under Namespace

Classes: NullStatsd

Constant Summary collapse

HANDLE_BATCHES =
false
NUMBER_OF_MESSAGES_TO_PREFETCH =

Only fetch one message at a time on the channel.

By default, queues will grab messages eagerly, which reduces latency. However, that also means that if multiple workers are running one worker can starve another of work. We’re not expecting a high throughput on this queue, and a small bit of latency isn’t a problem, so we fetch one at a time to share the work evenly.

1

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new($stderr)) ⇒ Consumer

Create a new consumer

Parameters:

  • queue_name (String)

    Your queue name. This is specific to your application, and should already exist and have a binding via puppet

  • processor (Object)

    An object that responds to ‘process`

  • rabbitmq_connection (Object) (defaults to: Consumer.default_connection_from_env)

    A Bunny connection object derived from ‘Bunny.new`

  • statsd_client (Statsd) (defaults to: NullStatsd.new)

    An instance of the Statsd class

  • logger (Object) (defaults to: Logger.new($stderr))

    A Logger object for emitting errors (to stderr by default)



30
31
32
33
34
35
36
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 30

def initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, statsd_client: NullStatsd.new, logger: Logger.new($stderr))
  @queue_name = queue_name
  @processor = processor
  @rabbitmq_connection = rabbitmq_connection
  @statsd_client = statsd_client
  @logger = logger
end

Class Method Details

.default_connection_from_envObject



13
14
15
16
17
18
19
20
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 13

def self.default_connection_from_env
  # https://github.com/ruby-amqp/bunny/blob/066496d/docs/guides/connecting.md#paas-environments
  if !ENV["RABBITMQ_URL"].to_s.empty?
    Bunny.new
  else
    Bunny.new(RabbitMQConfig.from_environment(ENV))
  end
end

Instance Method Details

#run(subscribe_opts: {}) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 38

def run(subscribe_opts: {})
  @rabbitmq_connection.start

  subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts)
  queue.subscribe(subscribe_opts) do |delivery_info, headers, payload|
    message = Message.new(payload, headers, delivery_info)
    @statsd_client.increment("#{@queue_name}.started")
    message_consumer.process(message)
    @statsd_client.increment("#{@queue_name}.#{message.status}")
  rescue SignalException => e
    @logger.error "SignalException in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}"
    exit(1) # Ensure rabbitmq requeues outstanding messages
  rescue StandardError => e
    @statsd_client.increment("#{@queue_name}.uncaught_exception")
    GovukError.notify(e) if defined?(GovukError)
    @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}"
    exit(1) # Ensure rabbitmq requeues outstanding messages
  end
end