Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Instance Method Summary collapse
-
#initialize(config, processor) ⇒ Consumer
constructor
A new instance of Consumer.
- #run ⇒ Object
Constructor Details
#initialize(config, processor) ⇒ Consumer
Returns a new instance of Consumer.
6 7 8 9 10 11 12 13 14 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 6 def initialize(config, processor) @processor = HeartbeatProcessor.new(processor) @config = config.with_indifferent_access @queue_name = @config.fetch(:queue) @bindings = { @config.fetch(:exchange) => "#" } @connection = Bunny.new(@config[:connection].symbolize_keys) @connection.start end |
Instance Method Details
#run ⇒ Object
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 16 def run queue.subscribe(:block => true, :manual_ack => true) do |delivery_info, headers, payload| begin @processor.process(Message.new(delivery_info, headers, payload)) rescue Exception => e $stderr.puts "rabbitmq_consumer: aborting due to unhandled exception in processor #{e.class}: #{e.message}" exit(1) # ensure rabbitmq requeues outstanding messages end end end |