Class: Brown::Agent::AMQPPublisher
- Inherits:
-
Object
- Object
- Brown::Agent::AMQPPublisher
- Defined in:
- lib/brown/agent/amqp_publisher.rb
Overview
Publish messages to an AMQP exchange.
Defined Under Namespace
Classes: BrokerError, Error, ExchangeError
Constant Summary collapse
- NoValue =
:nodoc: Sentinel to detect that we've been sent the "default" value, since
nil
can, sometimes, be a valid value. Module.new
Instance Method Summary collapse
-
#initialize(amqp_url: "amqp://localhost", exchange_type: :direct, exchange_name: "", routing_key: nil, message_type: nil, logger: Logger.new("/dev/null"), **amqp_opts) ⇒ AMQPPublisher
constructor
Create a new AMQPPublisher.
-
#publish(payload, type: NoValue, routing_key: NoValue, **amqp_opts) ⇒ Object
Publish a message to the exchange.
Constructor Details
#initialize(amqp_url: "amqp://localhost", exchange_type: :direct, exchange_name: "", routing_key: nil, message_type: nil, logger: Logger.new("/dev/null"), **amqp_opts) ⇒ AMQPPublisher
Create a new AMQPPublisher.
Setup an exchange in the AMQP broker, and allow the publishing of messages to that exchange.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/brown/agent/amqp_publisher.rb', line 91 def initialize(amqp_url: "amqp://localhost", exchange_type: :direct, exchange_name: "", routing_key: nil, message_type: nil, logger: Logger.new("/dev/null"), **amqp_opts ) begin @amqp_session = Bunny.new(amqp_url, logger: logger) @amqp_session.start rescue Bunny::TCPConnectionFailed raise BrokerError, "Failed to connect to #{amqp_url}" rescue Bunny::PossibleAuthenticationFailureError raise BrokerError, "Authentication failed for #{amqp_url}" rescue StandardError => ex raise Error, "Unknown error occured: #{ex.} (#{ex.class})" end @amqp_channel = @amqp_session.create_channel begin @amqp_exchange = @amqp_channel.exchange( exchange_name, type: exchange_type, durable: true ) rescue Bunny::PreconditionFailed => ex raise ExchangeError, "Failed to open exchange: #{ex.}" end @message_defaults = { :routing_key => routing_key, :type => }.merge(amqp_opts) @channel_mutex = Mutex.new end |
Instance Method Details
#publish(payload, type: NoValue, routing_key: NoValue, **amqp_opts) ⇒ Object
Publish a message to the exchange.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/brown/agent/amqp_publisher.rb', line 152 def publish(payload, type: NoValue, routing_key: NoValue, **amqp_opts) opts = @message_defaults.merge( { type: type, routing_key: routing_key }.delete_if { |_,v| v == NoValue } ).delete_if { |_,v| v.nil? }.merge(amqp_opts) if @amqp_exchange.name == "" and opts[:routing_key].nil? raise ExchangeError, "Cannot send a message to the default exchange without a routing key" end @channel_mutex.synchronize do @amqp_exchange.publish(payload, opts) end end |