Class: RabbitMQ::Actors::HeadersProducer

Inherits:
Base::Producer show all
Defined in:
lib/rabbitmq/actors/patterns/headers/headers_producer.rb

Overview

A producer of messages routed to certain queues bound based on message headers matching.

Examples:

RabbitMQ::Server.url = 'amqp://localhost'

publisher = RabbitMQ::Actors::HeadersProducer.new(headers_name: 'reports', logger: Rails.logger)
message = 'A report about USA economy'
publisher.publish(message, message_id: '1234837633', headers: { 'type' => :economy, 'area' => 'USA'})

Instance Attribute Summary collapse

Attributes inherited from Base::Agent

#queue

Instance Method Summary collapse

Methods inherited from Base::Producer

#close

Constructor Details

#initialize(headers_name:, **opts) ⇒ HeadersProducer

Returns a new instance of HeadersProducer.

Parameters:

  • :headers_name (String)

    name of the headers exchange where to publish messages.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :reply_queue_name (String)

    the name of the queue where a consumer should reply.

  • :logger (Logger)

    the logger where to output info about this agent’s activity.



22
23
24
# File 'lib/rabbitmq/actors/patterns/headers/headers_producer.rb', line 22

def initialize(headers_name:, **opts)
  super(opts.merge(headers_name: headers_name))
end

Instance Attribute Details

#headers_nameObject (readonly)

Returns the value of attribute headers_name.



17
18
19
# File 'lib/rabbitmq/actors/patterns/headers/headers_producer.rb', line 17

def headers_name
  @headers_name
end

Instance Method Details

#publish(message, message_id:, headers:, **opts) ⇒ Object

Send a message to the RabbitMQ server.

Parameters:

  • message (String)

    the message body to be sent.

  • :message_id (String)

    user-defined id for replies to refer to this message using :correlation_id

  • :headers (Hash)

    send the message only to queues bound to this exchange and matching any/all of these headers.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?. Default true.

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to. Default to replay_queue_name if it was defined at creation time.

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

See Also:

  • for extra options:


44
45
46
# File 'lib/rabbitmq/actors/patterns/headers/headers_producer.rb', line 44

def publish(message, message_id:, headers:, **opts)
  super(message, opts.merge(message_id: message_id, headers: headers))
end