Class: RabbitMQ::Actors::Base::Producer Abstract

Inherits:
Agent
  • Object
show all
Defined in:
lib/rabbitmq/actors/base/producer.rb

Overview

This class is abstract.

Subclass and override #pre_initialize and #exchange to define actual producer classes.

The base class to define actual RabbitMQ message producer classes.

Examples:

module RabbitMQ::Actors
  class MasterProducer < Base::Producer
    def initialize(queue_name:, **opts)
      super(opts.merge(queue_name: queue_name))
    end

    def publish(message, message_id:, **opts)
      super(message, opts.merge(message_id: message_id, routing_key: queue.name))
    end

    private

    def exchange
      @exchange ||= channel.default_exchange
    end
  end
end

Instance Attribute Summary

Attributes inherited from Agent

#queue

Instance Method Summary collapse

Methods inherited from Agent

#initialize

Constructor Details

This class inherits a constructor from RabbitMQ::Actors::Base::Agent

Instance Method Details

#closeObject Also known as: and_close

Close the connection channel to RabbitMQ. Log the action to the logger with info severity.



49
50
51
52
53
# File 'lib/rabbitmq/actors/base/producer.rb', line 49

def close
  logger.info(self.class.name) { "Just Before #{self} closes RabbitMQ channel!" }
  channel.close
  logger.info(self.class.name) { "Just After #{self} closes RabbitMQ channel!" }
end

#publish(message, message_id:, **opts) ⇒ Object

Send a messages to RabbitMQ server. Log the action to the logger with info severity.

Parameters:

  • message (String)

    the message body to be sent.

  • :message_id (String)

    user-defined id for replies to refer to this message using :correlation_id

  • :opts (Hash)

    receives extra options from your subclass

See Also:

  • for extra options


36
37
38
39
40
41
42
43
# File 'lib/rabbitmq/actors/base/producer.rb', line 36

def publish(message, message_id:, **opts)
  options = opts.merge(message_id: message_id).reverse_merge!(persistent: true)
  options.merge!(reply_to: reply_queue.name) if reply_queue
  logger.info(self.class.name) { "Just Before #{self} publishes message: #{message} with options: #{options}" }
  exchange.publish(message, options)
  logger.info(self.class.name) { "Just After #{self} publishes message: #{message} with options: #{options}" }
  self
end