Class: PublishingPlatformMessageQueueConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/publishing_platform_message_queue_consumer/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, processor:, rabbitmq_connection: Bunny.new, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer

Create a new consumer

Parameters:

  • queue_name (String)

    Your queue name. This is specific to your application, and should already exist and have a binding configured via Terraform.

  • processor (Object)

    An object that responds to ‘process`

  • rabbitmq_connection (Object) (defaults to: Bunny.new)

    A Bunny connection object derived from ‘Bunny.new`

  • logger (Object) (defaults to: Logger.new($stderr))

    A Logger object for emitting errors (to stderr by default)

  • worker_threads (Number) (defaults to: 1)

    Size of the worker thread pool. Defaults to 1.

  • prefetch (Number) (defaults to: 1)

    Maximum number of unacked messages to allow on the channel. See www.rabbitmq.com/docs/consumer-prefetch Defaults to 1.



16
17
18
19
20
21
22
23
# File 'lib/publishing_platform_message_queue_consumer/consumer.rb', line 16

def initialize(queue_name:, processor:, rabbitmq_connection: Bunny.new, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1)
  @queue_name = queue_name
  @processor = processor
  @rabbitmq_connection = rabbitmq_connection
  @logger = logger
  @worker_threads = worker_threads
  @prefetch = prefetch
end

Instance Method Details

#run(subscribe_opts: {}) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/publishing_platform_message_queue_consumer/consumer.rb', line 25

def run(subscribe_opts: {})
  @rabbitmq_connection.start

  subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts)
  queue.subscribe(subscribe_opts) do |delivery_info, headers, payload|
    message = Message.new(payload, headers, delivery_info)
    message_consumer.process(message)
  rescue StandardError => e
    PublishingPlatformError.notify(e) if defined?(PublishingPlatformError)
    @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}"
    exit(1) # Ensure rabbitmq requeues outstanding messages
  end
rescue SignalException => e
  PublishingPlatformError.notify(e) if defined?(PublishingPlatformError) && e.message != "SIGTERM"

  exit
end