Class: GovukMessageQueueConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/govuk_message_queue_consumer/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, processor) ⇒ Consumer

Returns a new instance of Consumer.



6
7
8
9
10
11
12
13
14
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 6

def initialize(config, processor)
  @processor = HeartbeatProcessor.new(processor)

  @config = config.with_indifferent_access
  @queue_name = @config.fetch(:queue)
  @bindings = { @config.fetch(:exchange) => "#" }
  @connection = Bunny.new(@config[:connection].symbolize_keys)
  @connection.start
end

Instance Method Details

#runObject



16
17
18
19
20
21
22
23
24
25
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 16

def run
  queue.subscribe(:block => true, :manual_ack => true) do |delivery_info, headers, payload|
    begin
      @processor.process(Message.new(delivery_info, headers, payload))
    rescue Exception => e
      $stderr.puts "rabbitmq_consumer: aborting due to unhandled exception in processor #{e.class}: #{e.message}"
      exit(1) # ensure rabbitmq requeues outstanding messages
    end
  end
end