Class: RabbitMQ::Actors::MasterProducer

Inherits:
Base::Producer show all
Defined in:
lib/rabbitmq/actors/patterns/master_workers/master_producer.rb

Overview

A producer of messages routed (via a default exchange) to a given queue. Used to distribute tasks among several worker processes listening a shared queue.

Examples:

RabbitMQ::Server.url = 'amqp://localhost'

master = RabbitMQ::Actors::MasterProducer.new(
  queue_name:       'purchases',
  auto_delete:      false,
  reply_queue_name: 'confirmations',
  logger:           Rails.logger)

message = { stock: 'Apple', number: 1000 }.to_json
master.publish(message, message_id: '1234837325', content_type: "application/json")

Instance Attribute Summary

Attributes inherited from Base::Agent

#queue

Instance Method Summary collapse

Methods inherited from Base::Producer

#close

Constructor Details

#initialize(queue_name:, **opts) ⇒ MasterProducer

Returns a new instance of MasterProducer.

Parameters:

  • :queue_name (String)

    name of the durable queue where to publish messages.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :auto_delete (Boolean) — default: true

    if the queue will be deleted when there are no more consumers subscribed to it.

  • :reply_queue_name (String)

    the name of the queue where a consumer should reply.

  • :logger (Logger)

    the logger where to output info about this agent’s activity.



27
28
29
# File 'lib/rabbitmq/actors/patterns/master_workers/master_producer.rb', line 27

def initialize(queue_name:, **opts)
  super(opts.merge(queue_name: queue_name, exclusive: false))
end

Instance Method Details

#publish(message, message_id:, **opts) ⇒ Object

Send a message to the RabbitMQ server.

Parameters:

  • message (String)

    the message body to be sent.

  • :message_id (String)

    user-defined id for replies to refer to this message using :correlation_id

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?. Default true.

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to. Default to replay_queue_name if it was defined at creation time.

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

See Also:

  • for extra options:


48
49
50
# File 'lib/rabbitmq/actors/patterns/master_workers/master_producer.rb', line 48

def publish(message, message_id:, **opts)
  super(message, opts.merge(message_id: message_id, routing_key: queue.name))
end