Class: Brown::Agent::AMQPPublisher

Inherits:
Object
  • Object
show all
Defined in:
lib/brown/agent/amqp_publisher.rb

Overview

Publish messages to an AMQP exchange.

Defined Under Namespace

Classes: BrokerError, Error, ExchangeError

Constant Summary collapse

NoValue =

:nodoc: Sentinel to detect that we've been sent the "default" value, since nil can, sometimes, be a valid value.

Module.new

Instance Method Summary collapse

Constructor Details

#initialize(amqp_url: "amqp://localhost", exchange_type: :direct, exchange_name: "", routing_key: nil, message_type: nil, logger: Logger.new("/dev/null"), **amqp_opts) ⇒ AMQPPublisher

Create a new AMQPPublisher.

Setup an exchange in the AMQP broker, and allow the publishing of messages to that exchange.

Parameters:

  • amqp_url (#to_s) (defaults to: "amqp://localhost")

    the AMQP broker to connect to, specified as a URL. The scheme must be amqp. Username and password should be given in the standard fashion (amqp://<user>:<pass>@<host>).

    The path portion of AMQP URLs is totes spesh; if you want to connect to the default vhost (/) you either need to specify no trailing slash (ie amqp://hostname) or percent-encode the / vhost name (ie amqp://hostname/%2F). Yes, this drives me nuts, too.

  • exchange_type (Symbol) (defaults to: :direct)

    the type of exchange to create or publish to. By default, the exchange is created as a direct exchange; this routes messages to their destination queue(s) based on the routing_key (set per-publisher or per-queue). Other valid values for this option are :direct, :topic, and :headers.

  • exchange_name (#to_s) (defaults to: "")

    the name of the exchange to create or publish to. If not specified, then the "default" exchange is used, which is a direct exchange that routes to a queue with the same name as the routing key.

  • routing_key (#to_s) (defaults to: nil)

    The default "routing key" to attach to all messages sent via this publisher. This can also be set (or overridden) on a per-message basis; see #publish. If set to nil, no routing key will be set.

  • message_type (#to_s) (defaults to: nil)

    The default type for all messages sent via this publisher. This can also be set (or overridden) on a per-message basis; see #publish. If set to nil, no message type will be set by default.

  • logger (Logger) (defaults to: Logger.new("/dev/null"))

    somewhere to log everything.

  • amqp_opts (Hash)

    is a "catch-all" hash for any weird and wonderful AMQP options you may wish to set by default for all messages you send via this publisher. There are quite a number of rather esoteric options, which are not supported especially by Brown::Agent::AMQPPublisher, but if you really need them, they're here for you. See the relevant documentation for full details of every possible permutation.

Raises:

  • (ArgumentError)

    if the parameters provided are problematic, such as specifying an invalid exchange type or exchange name.

  • (Brown::Agent::AMQPPublisher::BrokerError)

    if the attempt to connect to the broker fails, due to a lack of connection, or wrong credentials.

  • (Brown::Agent::AMQPPublisher::ExchangeError)

    if the attempt to create the exchange fails for some reason (such as the exchange already existing with a different configuration).



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/brown/agent/amqp_publisher.rb', line 91

def initialize(amqp_url:      "amqp://localhost",
               exchange_type: :direct,
               exchange_name: "",
               routing_key:   nil,
               message_type:  nil,
               logger:        Logger.new("/dev/null"),
               **amqp_opts
              )
	begin
		@amqp_session = Bunny.new(amqp_url, logger: logger)
		@amqp_session.start
	rescue Bunny::TCPConnectionFailed
		raise BrokerError,
		      "Failed to connect to #{amqp_url}"
	rescue Bunny::PossibleAuthenticationFailureError
		raise BrokerError,
		      "Authentication failed for #{amqp_url}"
	rescue StandardError => ex
		raise Error,
		      "Unknown error occured: #{ex.message} (#{ex.class})"
	end

	@amqp_channel = @amqp_session.create_channel

	begin
		@amqp_exchange = @amqp_channel.exchange(
		                   exchange_name,
		                   type: exchange_type,
		                   durable: true
		                 )
	rescue Bunny::PreconditionFailed => ex
		raise ExchangeError,
		      "Failed to open exchange: #{ex.message}"
	end

	@message_defaults = {
		:routing_key => routing_key,
		:type        => message_type
	}.merge(amqp_opts)

	@channel_mutex = Mutex.new
end

Instance Method Details

#publish(payload, type: NoValue, routing_key: NoValue, **amqp_opts) ⇒ Object

Publish a message to the exchange.

Parameters:

  • payload (#to_s)

    the "body" of the message to send.

  • type (#to_s) (defaults to: NoValue)

    override the default message type set in the publisher, just for this one message.

  • routing_key (#to_s) (defaults to: NoValue)

    override the default routing key set in the publisher, just for this one message.

  • amqp_opts (Hash)

    is a "catch-all" hash for any weird and wonderful AMQP options you may wish to set. There are quite a number of rather esoteric options, which are not supported especially by Brown::Agent::AMQPPublisher, but if you really need them, they're here for you. See the relevant documentation for full details of every possible permutation.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/brown/agent/amqp_publisher.rb', line 152

def publish(payload, type: NoValue, routing_key: NoValue, **amqp_opts)
	opts = @message_defaults.merge(
	          {
	            type:        type,
	            routing_key: routing_key
	          }.delete_if { |_,v| v == NoValue }
	       ).delete_if { |_,v| v.nil? }.merge(amqp_opts)

	if @amqp_exchange.name == "" and opts[:routing_key].nil?
		raise ExchangeError,
		      "Cannot send a message to the default exchange without a routing key"
	end

	@channel_mutex.synchronize do
		@amqp_exchange.publish(payload, opts)
	end
end