Class: RabbitMQ::Actors::Base::Consumer Abstract
- Defined in:
- lib/rabbitmq/actors/base/consumer.rb
Overview
This class is abstract.
Subclass and override #perform to define actual consumer classes.
The base class to define actual RabbitMQ message consumer classes.
Direct Known Subclasses
HeadersConsumer, RoutingConsumer, Subscriber, TopicConsumer, Worker
Instance Attribute Summary
Attributes inherited from Agent
Instance Method Summary collapse
-
#initialize(queue_name: '', **opts) ⇒ Consumer
constructor
Rest of options required by your subclasses.
-
#start! ⇒ Object
Start listening to the queue and block waiting for a new task to be assigned by the server.
Constructor Details
#initialize(queue_name: '', **opts) ⇒ Consumer
Rest of options required by your subclasses.
27 28 29 |
# File 'lib/rabbitmq/actors/base/consumer.rb', line 27 def initialize(queue_name: '', **opts) super(opts.merge(queue_name: queue_name)) end |
Instance Method Details
#start! ⇒ Object
Start listening to the queue and block waiting for a new task to be assigned by the server. Perform the task, acknowledge if required and keep listening waiting for the message to come.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rabbitmq/actors/base/consumer.rb', line 34 def start! @_cancelled = false channel.prefetch(1) queue.subscribe(block: true, manual_ack: manual_ack?, on_cancellation: cancellation_handler) do |delivery_info, properties, body| begin logger.info(self.class.name) { "#{self} received task: #{body}" } self.perform_result = perform(delivery_info: delivery_info, properties: properties, body: body) done!(delivery_info) logger.info(self.class.name) { "#{self} performed task!" } rescue Exception => e logger.error "Error when #{self} performing task: #{e.}" cancellation_handler.call fail(e) end end cancellation_handler.call perform_result end |