Class: RabbitFeed::ProducerConnection

Inherits:
Connection
  • Object
show all
Defined in:
lib/rabbit_feed/producer_connection.rb

Constant Summary collapse

PUBLISH_OPTIONS =
{
  persistent: true, # Persist the message to disk
  mandatory:  true, # Return the message if it can't be routed to a queue
}.freeze
EXCHANGE_OPTIONS =
{
  type:        :topic, # Allow wildcard routing keys
  durable:     true,   # Persist across server restart
  no_declare:  false,  # Create the exchange if it does not exist
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProducerConnection

Returns a new instance of ProducerConnection.



19
20
21
22
23
24
25
26
# File 'lib/rabbit_feed/producer_connection.rb', line 19

def initialize
  super
  @exchange = channel.exchange RabbitFeed.configuration.exchange, exchange_options
  RabbitFeed.log.info { { event: :exchange_declared, exchange: RabbitFeed.configuration.exchange, options: exchange_options } }
  exchange.on_return do |return_info, _properties, content|
    RabbitFeed::ProducerConnection.handle_returned_message return_info, content
  end
end

Class Method Details

.handle_returned_message(return_info, _content) ⇒ Object



14
15
16
17
# File 'lib/rabbit_feed/producer_connection.rb', line 14

def self.handle_returned_message(return_info, _content)
  RabbitFeed.log.error { { event: :returned_message, return_info: return_info } }
  RabbitFeed.exception_notify(ReturnedMessageError.new(return_info))
end

Instance Method Details

#publish(message, options) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/rabbit_feed/producer_connection.rb', line 28

def publish(message, options)
  synchronized do
    bunny_options = (options.merge PUBLISH_OPTIONS)
    RabbitFeed.log.debug { { event: :publish, options: options, exchange: RabbitFeed.configuration.exchange } }
    exchange.publish message, bunny_options
  end
end