Class: Ears::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/ears/publisher.rb

Overview

Publisher for sending messages to RabbitMQ exchanges.

Uses a connection pool for thread-safe publishing with configurable pool size. This provides better performance and thread safety compared to using per-thread channels.

Instance Method Summary collapse

Constructor Details

#initialize(exchange_name, exchange_type = :topic, exchange_options = {}) ⇒ Publisher

Creates a new publisher for the specified exchange.

Parameters:

  • exchange_name (String)

    The name of the exchange to publish to.

  • exchange_type (Symbol) (defaults to: :topic)

    The type of the exchange (:direct, :fanout, :topic or :headers).

  • exchange_options (Hash) (defaults to: {})

    The options for the exchange. These are passed on to +Bunny::Exchange.new+.



18
19
20
21
22
23
24
# File 'lib/ears/publisher.rb', line 18

def initialize(exchange_name, exchange_type = :topic, exchange_options = {})
  @exchange_name = exchange_name
  @exchange_type = exchange_type
  @exchange_options = { durable: true }.merge(exchange_options)
  @config = Ears.configuration
  @logger = Ears.configuration.logger
end

Instance Method Details

#publish(data, routing_key:, **options) ⇒ void

This method returns an undefined value.

Publishes a JSON message to the configured exchange.

Parameters:

  • data (Hash, Array, Object)

    The data to serialize as JSON and publish.

  • routing_key (String)

    The routing key for the message.

  • opts (Hash)

    a customizable set of options

Raises:



48
49
50
51
52
53
54
# File 'lib/ears/publisher.rb', line 48

def publish(data, routing_key:, **options)
  publish_options = default_publish_options.merge(options)

  retry_handler.run do
    publish_with_channel(data:, routing_key:, publish_options:)
  end
end

#publish_with_confirmation(data, routing_key:, **options) ⇒ void

This method returns an undefined value.

Publishes a message to the configured exchange with confirmation. Waits for RabbitMQ to confirm the message was received.

Parameters:

  • data (Hash, Array, Object)

    The data to serialize as JSON and publish.

  • routing_key (String)

    The routing key for the message.

  • opts (Hash)

    a customizable set of options

Raises:



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/ears/publisher.rb', line 80

def publish_with_confirmation(data, routing_key:, **options)
  publish_options = default_publish_options.merge(options)

  retry_handler.run do
    validate_connection!

    PublisherChannelPool.with_channel(confirms: true) do |channel|
      exchange = create_exchange(channel)

      publisher_confirmation_handler.publish_with_confirmation(
        channel: channel,
        exchange: exchange,
        data: data,
        routing_key: routing_key,
        options: publish_options,
      )
    end
  end
end

#reset!void

This method returns an undefined value.

Resets the channel pool, forcing new channels to be created. This can be useful for connection recovery scenarios.



104
105
106
# File 'lib/ears/publisher.rb', line 104

def reset!
  PublisherChannelPool.reset!
end