Class: PublishingPlatformMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- PublishingPlatformMessageQueueConsumer::Consumer
- Defined in:
- lib/publishing_platform_message_queue_consumer/consumer.rb
Instance Method Summary collapse
-
#initialize(queue_name:, processor:, rabbitmq_connection: Bunny.new, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer
constructor
Create a new consumer.
- #run(subscribe_opts: {}) ⇒ Object
Constructor Details
#initialize(queue_name:, processor:, rabbitmq_connection: Bunny.new, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer
Create a new consumer
16 17 18 19 20 21 22 23 |
# File 'lib/publishing_platform_message_queue_consumer/consumer.rb', line 16 def initialize(queue_name:, processor:, rabbitmq_connection: Bunny.new, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) @queue_name = queue_name @processor = processor @rabbitmq_connection = rabbitmq_connection @logger = logger @worker_threads = worker_threads @prefetch = prefetch end |
Instance Method Details
#run(subscribe_opts: {}) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/publishing_platform_message_queue_consumer/consumer.rb', line 25 def run(subscribe_opts: {}) @rabbitmq_connection.start subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts) queue.subscribe(subscribe_opts) do |delivery_info, headers, payload| = Message.new(payload, headers, delivery_info) .process() rescue StandardError => e PublishingPlatformError.notify(e) if defined?(PublishingPlatformError) @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.message}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages end rescue SignalException => e PublishingPlatformError.notify(e) if defined?(PublishingPlatformError) && e. != "SIGTERM" exit end |