Class: Ears::Publisher
- Inherits:
-
Object
- Object
- Ears::Publisher
- Defined in:
- lib/ears/publisher.rb
Overview
Publisher for sending messages to RabbitMQ exchanges.
Uses a connection pool for thread-safe publishing with configurable pool size. This provides better performance and thread safety compared to using per-thread channels.
Instance Method Summary collapse
-
#initialize(exchange_name, exchange_type = :topic, exchange_options = {}) ⇒ Publisher
constructor
Creates a new publisher for the specified exchange.
-
#publish(data, routing_key:, **options) ⇒ void
Publishes a JSON message to the configured exchange.
-
#publish_with_confirmation(data, routing_key:, **options) ⇒ void
Publishes a message to the configured exchange with confirmation.
-
#reset! ⇒ void
Resets the channel pool, forcing new channels to be created.
Constructor Details
#initialize(exchange_name, exchange_type = :topic, exchange_options = {}) ⇒ Publisher
Creates a new publisher for the specified exchange.
18 19 20 21 22 23 24 |
# File 'lib/ears/publisher.rb', line 18 def initialize(exchange_name, exchange_type = :topic, = {}) @exchange_name = exchange_name @exchange_type = exchange_type @exchange_options = { durable: true }.merge() @config = Ears.configuration @logger = Ears.configuration.logger end |
Instance Method Details
#publish(data, routing_key:, **options) ⇒ void
This method returns an undefined value.
Publishes a JSON message to the configured exchange.
48 49 50 51 52 53 54 |
# File 'lib/ears/publisher.rb', line 48 def publish(data, routing_key:, **) = .merge() retry_handler.run do publish_with_channel(data:, routing_key:, publish_options:) end end |
#publish_with_confirmation(data, routing_key:, **options) ⇒ void
This method returns an undefined value.
Publishes a message to the configured exchange with confirmation. Waits for RabbitMQ to confirm the message was received.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/ears/publisher.rb', line 80 def publish_with_confirmation(data, routing_key:, **) = .merge() retry_handler.run do validate_connection! PublisherChannelPool.with_channel(confirms: true) do |channel| exchange = create_exchange(channel) publisher_confirmation_handler.publish_with_confirmation( channel: channel, exchange: exchange, data: data, routing_key: routing_key, options: , ) end end end |
#reset! ⇒ void
This method returns an undefined value.
Resets the channel pool, forcing new channels to be created. This can be useful for connection recovery scenarios.
104 105 106 |
# File 'lib/ears/publisher.rb', line 104 def reset! PublisherChannelPool.reset! end |