Class: AMQP::Exchange

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Entity
Defined in:
lib/amqp/exchange.rb

Overview

Note:

Please make sure you read a section on exchanges durability vs. messages persistence.

What are AMQP exchanges?

AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.

Entities that forward messages to consumers (or consumers fetch messages from on demand) are called queues. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.

AMQP bindings

Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.

Exchange types

There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.

Direct exchanges

Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called “routing key”. When messages arrive to a direct exchange, broker takes that message’s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.

Because very often queues are bound with the same routing key as queue’s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.

As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will result in a channel-level exception and fail. In practice these default exchanges are never used directly by client code.

Fanout exchanges

Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it’s own copy of the message.

Topic exchanges

Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.

As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.

The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.

When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.

Headers exchanges

When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to “all”.

A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).

As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.

Key methods

Key methods of Exchange class are

Exchange durability and persistence of messages.

Learn more in our Durability guide.

RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in VendorSpecificExtensions

Constant Summary

DEFAULT_CONTENT_TYPE =

API

"application/octet-stream".freeze
BUILTIN_TYPES =
[:fanout, :direct, :topic, :headers].freeze

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Entity

#callbacks

Declaration collapse

Exchange to Exchange Bindings collapse

Publishing Messages collapse

Error Handling and Recovery collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange

See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.

Predeclared exchanges

If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won’t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.

Examples:

Instantiating a fanout exchange using constructor


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    AMQP::Exchange.new(channel, :fanout, "search.index.updates") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Instantiating a direct exchange using Channel#direct


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    channel.direct("email.replies_listener") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Parameters:

  • channel (Channel)

    AMQP channel this exchange is associated with

  • type (Symbol)

    Exchange type

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :no_declare (Boolean) — default: true

    If set, exchange declaration command won’t be sent to the broker. Allows to forcefully avoid declaration. We recommend that only experienced developers consider this option.

  • :default_routing_key (String) — default: nil

    Default routing key that will be used by #publish when no routing key is not passed explicitly. It is perfectly fine for applications to always specify routing key to #publish.

  • :arguments (Hash) — default: nil

    A hash of optional arguments with the declaration. Some brokers implement AMQP extensions using x-prefixed declaration arguments.

Yields:

  • (exchange, declare_ok)

    Yields successfully declared exchange instance and AMQP method (exchange.declare-ok) instance. The latter is optional.

Yield Parameters:

  • exchange (Exchange)

    Exchange that is successfully declared and is ready to be used.

  • declare_ok (AMQP::Protocol::Exchange::DeclareOk)

    AMQP exchange.declare-ok) instance.

See Also:



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/amqp/exchange.rb', line 310

def initialize(channel, type, name, opts = {}, &block)
  @channel             = channel
  @type                = type
  @opts                = self.class.add_default_options(type, name, opts, block)
  @default_routing_key = opts[:routing_key] || opts[:key] || AMQ::Protocol::EMPTY_STRING
  @name                = name unless name.empty?

  @status                  = :unknown
  @default_publish_options = (opts.delete(:default_publish_options) || {
      :routing_key  => @default_routing_key,
      :mandatory    => false
    }).freeze

  @default_headers = (opts.delete(:default_headers) || {
      :content_type => DEFAULT_CONTENT_TYPE,
      :persistent   => false,
      :priority     => 0
    }).freeze

  if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/i)
    raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type)
  end

  @channel    = channel
  @name       = name
  @type       = type

  # register pre-declared exchanges
  if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/
    @channel.register_exchange(self)
  end

  super(channel.connection)

  # The AMQP 0.8 specification (as well as 0.9.1) in 1.1.4.2 mentiones
  # that Exchange.Declare-Ok confirms the name of the exchange (because
  # of automatically­named), which is logical to interpret that this
  # functionality should be the same as for Queue (though it isn't
  # explicitely told in the specification). In fact, RabbitMQ (and
  # probably other implementations as well) doesn't support it and
  # there is a default exchange with an empty name (so-called default
  # or nameless exchange), so if we'd send Exchange.Declare(exchange=""),
  # then RabbitMQ interpret it as if we'd try to redefine this default
  # exchange so it'd produce an error.
  unless name == "amq.#{type}" or name.empty? or opts[:no_declare]
    @status = :opening

    unless @opts[:no_declare]
      @channel.once_open do
        if block
          shim = Proc.new do |exchange, declare_ok|
            case block.arity
            when 1 then block.call(exchange)
            else
              block.call(exchange, declare_ok)
            end
          end

          self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments], &shim)
        else
          self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments])
        end
      end
    end
  else
    # Call the callback immediately, as given exchange is already
    # declared.
    @status = :opened
    block.call(self) if block
  end

  @on_declare = block
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.

Returns:

  • (Hash)

    Additional arguments given on queue declaration. Typically used by AMQP extensions.



206
207
208
# File 'lib/amqp/exchange.rb', line 206

def arguments
  @arguments
end

#channelChannel (readonly)

Returns:



203
204
205
# File 'lib/amqp/exchange.rb', line 203

def channel
  @channel
end

#default_routing_keyString (readonly) Also known as: key

Returns:

  • (String)


210
211
212
# File 'lib/amqp/exchange.rb', line 210

def default_routing_key
  @default_routing_key
end

#nameString (readonly)

Returns:

  • (String)


184
185
186
# File 'lib/amqp/exchange.rb', line 184

def name
  @name
end

#on_declare#call

Returns A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.

Returns:

  • (#call)

    A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.



200
201
202
# File 'lib/amqp/exchange.rb', line 200

def on_declare
  @on_declare
end

#optsHash

Options hash this exchange instance was instantiated with

Returns:

  • (Hash)


196
197
198
# File 'lib/amqp/exchange.rb', line 196

def opts
  @opts
end

#statusSymbol (readonly)

Returns:

  • (Symbol)


192
193
194
# File 'lib/amqp/exchange.rb', line 192

def status
  @status
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


188
189
190
# File 'lib/amqp/exchange.rb', line 188

def type
  @type
end

Class Method Details

.default(channel = nil) ⇒ Exchange

Note:

Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.

The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.

Examples:

Publishing a messages to the tasks queue

channel     = AMQP::Channel.new(connection)
tasks_queue = channel.queue("tasks")
AMQP::Exchange.default(channel).publish("make clean", routing_key => "tasks")

Parameters:

  • channel (AMQP::Channel) (defaults to: nil)

    Channel to use. If not given, new AMQP channel will be opened on the default AMQP connection (accessible as AMQP.connection).

Returns:

  • (Exchange)

    An instance that corresponds to the default exchange (of type direct).

See Also:



178
179
180
# File 'lib/amqp/exchange.rb', line 178

def self.default(channel = nil)
  self.new(channel || AMQP::Channel.new, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

Instance Method Details

#auto_deleted?Boolean Also known as: auto_deletable?

Returns true if this exchange is automatically deleted when it is no longer used

Returns:

  • (Boolean)

    true if this exchange is automatically deleted when it is no longer used



596
597
598
# File 'lib/amqp/exchange.rb', line 596

def auto_deleted?
  !!@opts[:auto_delete]
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



844
845
846
# File 'lib/amqp/exchange.rb', line 844

def auto_recover
  self.redeclare unless predefined?
end

#basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false) ⇒ Object



770
771
772
773
774
775
776
777
# File 'lib/amqp/exchange.rb', line 770

def basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false)
  headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers)
  @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, false, @connection.frame_max), @channel)

  # publisher confirms support. MK.
  @channel.exec_callback(:after_publish)
  self
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



815
816
817
# File 'lib/amqp/exchange.rb', line 815

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#bind(source, opts = {}, &block) ⇒ Object



700
701
702
703
704
705
706
707
708
709
710
711
712
713
# File 'lib/amqp/exchange.rb', line 700

def bind(source, opts = {}, &block)
  source = source.name if source.respond_to?(:name)
  routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING
  arguments = opts[:arguments] || {}
  nowait = opts[:nowait] || block.nil?
  @channel.once_open do
    @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@channel.id, @name, source, routing_key, nowait, arguments))
    unless nowait
      self.define_callback(:bind, &block)
      @channel.exchanges_awaiting_bind_ok.push(self)
    end
  end
  self
end

#callback#call

Deprecated.

Compatibility alias for #on_declare.

Returns:

  • (#call)


218
219
220
# File 'lib/amqp/exchange.rb', line 218

def callback
  @on_declare
end

#custom_type?Boolean

Returns true if this exchange is of a custom type (begins with x-)

Returns:

  • (Boolean)

    true if this exchange is of a custom type (begins with x-)



416
417
418
# File 'lib/amqp/exchange.rb', line 416

def custom_type?
  @type.to_s =~ /^x-.+/i
end

#delete(opts = {}, &block) ⇒ NilClass

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.

Examples:

Deleting an exchange


exchange = AMQP::Channel.direct("search.indexing")
exchange.delete

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :nowait (Boolean) — default: false

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :if_unused (Boolean) — default: false

    If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

Returns:

  • (NilClass)

    nil



568
569
570
571
572
573
574
575
# File 'lib/amqp/exchange.rb', line 568

def delete(opts = {}, &block)
  @channel.once_open do
    exchange_delete(opts.fetch(:if_unused, false), opts.fetch(:nowait, false), &block)
  end

  # backwards compatibility
  nil
end

#direct?Boolean

Returns true if this exchange is of type `direct`

Returns:

  • (Boolean)

    true if this exchange is of type `direct`



398
399
400
# File 'lib/amqp/exchange.rb', line 398

def direct?
  @type == :direct
end

#durable?Boolean

Note:

Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.

Returns true if this exchange is durable

Returns:

  • (Boolean)

    true if this exchange is durable



581
582
583
# File 'lib/amqp/exchange.rb', line 581

def durable?
  !!@opts[:durable]
end

#exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block) ⇒ Object



611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/amqp/exchange.rb', line 611

def exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block)
  # for re-declaration
  @passive     = passive
  @durable     = durable
  @auto_delete = auto_delete
  @arguments   = arguments
  @internal    = internal

  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, internal, nowait, arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#exchange_delete(if_unused = false, nowait = false, &block) ⇒ Object



647
648
649
650
651
652
653
654
655
656
657
658
# File 'lib/amqp/exchange.rb', line 647

def exchange_delete(if_unused = false, nowait = false, &block)
  @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait))

  unless nowait
    self.define_callback(:delete, &block)

    # TODO: delete itself from exchanges cache
    @channel.exchanges_awaiting_delete_ok.push(self)
  end

  self
end

#fanout?Boolean

Returns true if this exchange is of type `fanout`

Returns:

  • (Boolean)

    true if this exchange is of type `fanout`



392
393
394
# File 'lib/amqp/exchange.rb', line 392

def fanout?
  @type == :fanout
end

#handle_bind_ok(method) ⇒ Object



863
864
865
# File 'lib/amqp/exchange.rb', line 863

def handle_bind_ok(method)
  self.exec_callback_once(:bind, method)
end

#handle_declare_ok(method) ⇒ Object



857
858
859
860
861
# File 'lib/amqp/exchange.rb', line 857

def handle_declare_ok(method)
  @channel.register_exchange(self)

  self.exec_callback_once_yielding_self(:declare, method)
end

#handle_delete_ok(method) ⇒ Object



871
872
873
# File 'lib/amqp/exchange.rb', line 871

def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end

#handle_unbind_ok(method) ⇒ Object



867
868
869
# File 'lib/amqp/exchange.rb', line 867

def handle_unbind_ok(method)
  self.exec_callback_once(:unbind, method)
end

#headers?Boolean

Returns true if this exchange is of type `headers`

Returns:

  • (Boolean)

    true if this exchange is of type `headers`



410
411
412
# File 'lib/amqp/exchange.rb', line 410

def headers?
  @type == :headers
end

#internal?Boolean

Returns true if this exchange is an internal exchange

Returns:

  • (Boolean)

    true if this exchange is an internal exchange



426
427
428
# File 'lib/amqp/exchange.rb', line 426

def internal?
  @opts[:internal]
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



798
799
800
# File 'lib/amqp/exchange.rb', line 798

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



829
830
831
# File 'lib/amqp/exchange.rb', line 829

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_return(&block) ⇒ Object



781
782
783
784
785
# File 'lib/amqp/exchange.rb', line 781

def on_return(&block)
  self.redefine_callback(:return, &block)

  self
end

#predefined?Boolean

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



421
422
423
# File 'lib/amqp/exchange.rb', line 421

def predefined?
  @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i))
end

#publish(payload, options = {}, &block) ⇒ Exchange

Note:

Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a “delivery callback”. The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions or lightweight “publisher confirms” RabbitMQ extension supported by amqp gem. See Durability and message persistence and Working With Exchanges guides for details.

Note:

Please make sure you read Durability an message persistence guide that covers exchanges durability vs. messages persistence.

Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).

Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.

Data serialization

Note that this method calls #to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so BSON may be a good fit.

Publishing and message persistence

In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:

  • Persistent messages
  • Messages acknowledgements
  • Transactions
  • (a RabbitMQ-specific extension) Publisher confirms

This is a broad topic and we dedicate a separate guide, Durability and message persistence, to it.

Publishing callback and guarantees it DOES NOT offer

Exact moment when message is published is not determined and depends on many factors, including machine’s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ

Event loop blocking

When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:

  • Run EventMachine in a separate thread.
  • Use EventMachine.next_tick.
  • Use EventMachine.defer to offload operation to EventMachine thread pool.

TBD: this subject is worth a separate guide

Sending one-off messages

If you need to send a one-off message and then stop the event loop, pass a block to #publish that will be executed after message is pushed down the network stack, and use Session#disconnect to properly tear down AMQP connection (see example under Examples section below).

Examples:

Publishing a one-off message and properly closing AMQP connection then stopping the event loop:

exchange.publish(data) do
  connection.disconnect { EventMachine.stop }
end

Publishing without routing key

exchange = channel.fanout('search.indexer')
# fanout exchanges deliver messages to bound queues unconditionally,
# so routing key is unnecessary here
exchange.publish("some data")

Publishing with a routing key

exchange = channel.direct('search.indexer')
exchange.publish("some data", :routing_key => "search.index.updates")

Parameters:

  • payload (#to_s)

    Message payload (content). Note that this method calls #to_s on payload argument value. You are encouraged to take care of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library).

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :routing_key (String) — default: nil

    Specifies message routing key. Routing key determines what queues messages are delivered to (exact routing algorithms vary between exchange types).

  • :mandatory (Boolean) — default: false

    This flag tells the server how to react if the message cannot be routed to a queue. If message is mandatory, the server will return unroutable message back to the client with basic.return AMQPmethod. If message is not mandatory, the server silently drops the message.

  • :persistent (Boolean) — default: false

    When true, this message will be persisted to disk and remain in the queue until it is consumed. When false, the message is only kept in a transient store and will lost in case of server restart. When performance and latency are more important than durability, set :persistent => false. If durability is more important, set :persistent => true.

  • :content_type (String) — default: application/octet-stream

    Content-type of message payload.

Returns:



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/amqp/exchange.rb', line 533

def publish(payload, options = {}, &block)
  opts    = @default_publish_options.merge(options)

  @channel.once_open do
    properties                 = @default_headers.merge(options)
    properties[:delivery_mode] = properties.delete(:persistent) ? 2 : 1
    basic_publish(payload.to_s, opts[:key] || opts[:routing_key] || @default_routing_key, properties, opts[:mandatory])

    # don't pass block to AMQP::Exchange#publish because it will be executed
    # immediately and we want to do it later. See ruby-amqp/amqp/#67 MK.
    EventMachine.next_tick(&block) if block
  end

  self
end

#redeclare(&block) ⇒ Object



631
632
633
634
635
636
637
638
639
640
641
# File 'lib/amqp/exchange.rb', line 631

def redeclare(&block)
  nowait = block.nil?
  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, @internal, nowait, @arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#resetObject

Resets queue state. Useful for error handling.



603
604
605
# File 'lib/amqp/exchange.rb', line 603

def reset
  initialize(@channel, @type, @name, @opts)
end

#topic?Boolean

Returns true if this exchange is of type `topic`

Returns:

  • (Boolean)

    true if this exchange is of type `topic`



404
405
406
# File 'lib/amqp/exchange.rb', line 404

def topic?
  @type == :topic
end

#transient?Boolean Also known as: temporary?

Note:

Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.

Returns true if this exchange is transient (non-durable)

Returns:

  • (Boolean)

    true if this exchange is transient (non-durable)



589
590
591
# File 'lib/amqp/exchange.rb', line 589

def transient?
  !self.durable?
end

#unbind(source, opts = {}, &block) ⇒ Object



750
751
752
753
754
755
756
757
758
759
760
761
762
763
# File 'lib/amqp/exchange.rb', line 750

def unbind(source, opts = {}, &block)
  source = source.name if source.respond_to?(:name)
  routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING
  arguments = opts[:arguments] || {}
  nowait = opts[:nowait] || block.nil?
  @channel.once_open do
    @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@channel.id, @name, source, routing_key, nowait, arguments))
    unless nowait
      self.define_callback(:unbind, &block)
      @channel.exchanges_awaiting_unbind_ok.push(self)
    end
  end
  self
end