Class: RabbitMQ::Actors::Base::Consumer Abstract

Inherits:
Agent
  • Object
show all
Defined in:
lib/rabbitmq/actors/base/consumer.rb

Overview

This class is abstract.

Subclass and override #perform to define actual consumer classes.

The base class to define actual RabbitMQ message consumer classes.

Examples:

module RabbitMQ
  module Actors
    class Worker < Base::Consumer
      def initialize(queue_name:, **opts)
        super(opts.merge(queue_name: queue_name))
      end
    end
  end
end

Instance Attribute Summary

Attributes inherited from Agent

#queue

Instance Method Summary collapse

Constructor Details

#initialize(queue_name: '', **opts) ⇒ Consumer

Rest of options required by your subclasses.

Parameters:

  • :queue_name (String)

    the queue name to bind the consumer with.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :manual_ack (Boolean)

    to tell RabbitMQ server not to remove the message from re-delivery until a manual acknowledgment from the consumer has been received.

  • :on_cancellation (Proc)

    to be executed if the agent is cancelled



27
28
29
# File 'lib/rabbitmq/actors/base/consumer.rb', line 27

def initialize(queue_name: '', **opts)
  super(opts.merge(queue_name: queue_name))
end

Instance Method Details

#start!Object

Start listening to the queue and block waiting for a new task to be assigned by the server. Perform the task, acknowledge if required and keep listening waiting for the message to come.

Raises:

  • (Exception)

    if something goes wrong during the execution of the task.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rabbitmq/actors/base/consumer.rb', line 34

def start!
  @_cancelled = false
  channel.prefetch(1)
  queue.subscribe(block: true, manual_ack: manual_ack?, on_cancellation: cancellation_handler) do |delivery_info, properties, body|
    begin
      logger.info(self.class.name) { "#{self} received task: #{body}" }
      self.perform_result = perform(delivery_info: delivery_info, properties: properties, body: body)
      done!(delivery_info)
      logger.info(self.class.name) { "#{self} performed task!" }
    rescue Exception => e
      logger.error "Error when #{self} performing task: #{e.message}"
      cancellation_handler.call
      fail(e)
    end
  end
  cancellation_handler.call
  perform_result
end