Class: RabbitMQ::Actors::TopicProducer

Inherits:
Base::Producer show all
Defined in:
lib/rabbitmq/actors/patterns/topics/topic_producer.rb

Overview

A producer of messages routed to all the queues bound to the message’s routing_key via matching patterns

Examples:

RabbitMQ::Server.url = 'amqp://localhost'

publisher = RabbitMQ::Actors::TopicProducer.new(topic_name: 'weather', logger: Rails.logger)
message   = { temperature: 20, rain: 30%, wind: 'NorthEast' }.to_json
publisher.publish(message, message_id: '1234837633', content_type: "application/json", routing_key: 'Europe.Spain.Madrid')

Instance Attribute Summary collapse

Attributes inherited from Base::Agent

#queue

Instance Method Summary collapse

Methods inherited from Base::Producer

#close

Constructor Details

#initialize(topic_name:, **opts) ⇒ TopicProducer

Returns a new instance of TopicProducer.

Parameters:

  • :topic_name (String)

    name of the topic exchange where to send messages to.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :reply_queue_name (String)

    the name of the queue where a consumer should reply.

  • :logger (Logger)

    the logger where to output info about this agent’s activity.



22
23
24
# File 'lib/rabbitmq/actors/patterns/topics/topic_producer.rb', line 22

def initialize(topic_name:, **opts)
  super(opts.merge(topic_name: topic_name))
end

Instance Attribute Details

#topic_nameObject (readonly)

Returns the value of attribute topic_name.



17
18
19
# File 'lib/rabbitmq/actors/patterns/topics/topic_producer.rb', line 17

def topic_name
  @topic_name
end

Instance Method Details

#publish(message, message_id:, routing_key:, **opts) ⇒ Object

Send a message to the RabbitMQ server.

Parameters:

  • message (String)

    the message body to be sent.

  • :message_id (String)

    user-defined id for replies to refer to this message using :correlation_id

  • :routing_key (String)

    send the message only to queues bound to this exchange and matching this routing_key

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?. Default true.

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to. Default to replay_queue_name if it was defined at creation time.

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

See Also:

  • for extra options:


44
45
46
# File 'lib/rabbitmq/actors/patterns/topics/topic_producer.rb', line 44

def publish(message, message_id:, routing_key:, **opts)
  super(message, opts.merge(message_id: message_id, routing_key: routing_key))
end