Class: GovukMessageQueueConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/govuk_message_queue_consumer/consumer.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer

Create a new consumer

Parameters:

  • queue_name (String)

    Your queue name. This is specific to your application, and should already exist and have a binding configured via Terraform.

  • processor (Object)

    An object that responds to ‘process`

  • rabbitmq_connection (Object) (defaults to: Consumer.default_connection_from_env)

    A Bunny connection object derived from ‘Bunny.new`

  • logger (Object) (defaults to: Logger.new($stderr))

    A Logger object for emitting errors (to stderr by default)

  • worker_threads (Number) (defaults to: 1)

    Size of the worker thread pool. Defaults to 1.

  • prefetch (Number) (defaults to: 1)

    Maximum number of unacked messages to allow on the channel. See www.rabbitmq.com/docs/consumer-prefetch Defaults to 1.



25
26
27
28
29
30
31
32
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 25

def initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1)
  @queue_name = queue_name
  @processor = processor
  @rabbitmq_connection = rabbitmq_connection
  @logger = logger
  @worker_threads = worker_threads
  @prefetch = prefetch
end

Class Method Details

.default_connection_from_envObject



3
4
5
6
7
8
9
10
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 3

def self.default_connection_from_env
  # https://github.com/ruby-amqp/bunny/blob/066496d/docs/guides/connecting.md#paas-environments
  if !ENV["RABBITMQ_URL"].to_s.empty?
    Bunny.new
  else
    Bunny.new(RabbitMQConfig.from_environment(ENV))
  end
end

Instance Method Details

#run(subscribe_opts: {}) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 34

def run(subscribe_opts: {})
  @rabbitmq_connection.start

  subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts)
  queue.subscribe(subscribe_opts) do |delivery_info, headers, payload|
    message = Message.new(payload, headers, delivery_info)
    message_consumer.process(message)
  rescue StandardError => e
    GovukError.notify(e) if defined?(GovukError)
    @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}"
    exit(1) # Ensure rabbitmq requeues outstanding messages
  end
rescue SignalException => e
  GovukError.notify(e) if defined?(GovukError) && e.message != "SIGTERM"

  exit
end