Class: MQ

Inherits:
Object
  • Object
show all
Includes:
AMQP, EM::Deferrable
Defined in:
lib/mq.rb,
lib/mq.rb,
lib/mq.rb,
lib/mq/rpc.rb,
lib/mq/queue.rb,
lib/mq/header.rb,
lib/mq/logger.rb,
lib/mq/exchange.rb

Overview

– convenience wrapper (read: HACK) for thread-local MQ object

Defined Under Namespace

Classes: Error, Exchange, Header, Logger, Queue, RPC

Constant Summary

Constants included from AMQP

AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_FILE, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AMQP

client, client=, connect, fork, settings, start, stop

Constructor Details

#initialize(connection = nil) ⇒ MQ

Returns a new channel. A channel is a bidirectional virtual connection between the client and the AMQP server. Elsewhere in the library the channel is referred to in parameter lists as mq.

Optionally takes the result from calling AMQP::connect.

Rarely called directly by client code. This is implicitly called by most instance methods. See #method_missing.

EM.run do
  channel = MQ.new
end

EM.run do
  channel = MQ.new AMQP::connect
end


139
140
141
142
143
144
145
146
147
148
# File 'lib/mq.rb', line 139

def initialize connection = nil
  raise 'MQ can only be used from within EM.run{}' unless EM.reactor_running?

  @connection = connection || AMQP.start

  conn.callback { |c|
    @channel = c.add_channel(self)
    send Protocol::Channel::Open.new
  }
end

Class Attribute Details

.loggingObject

Returns the value of attribute logging.



14
15
16
# File 'lib/mq.rb', line 14

def logging
  @logging
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



150
151
152
# File 'lib/mq.rb', line 150

def channel
  @channel
end

#connectionObject (readonly) Also known as: conn

Returns the value of attribute connection.



150
151
152
# File 'lib/mq.rb', line 150

def connection
  @connection
end

Class Method Details

.defaultObject



865
866
867
868
# File 'lib/mq.rb', line 865

def MQ.default
  # TODO: clear this when connection is closed
  Thread.current[:mq] ||= MQ.new
end

.error(msg = nil, &blk) ⇒ Object

Define a message and callback block to be executed on all errors.



744
745
746
747
748
749
750
# File 'lib/mq.rb', line 744

def self.error msg = nil, &blk
  if blk
    @error_callback = blk
  else
    @error_callback.call(msg) if @error_callback and msg
  end
end

.idObject

unique identifier



861
862
863
# File 'lib/mq.rb', line 861

def MQ.id
  Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end

.method_missing(meth, *args, &blk) ⇒ Object

Allows for calls to all MQ instance methods. This implicitly calls MQ.new so that a new channel is allocated for subsequent operations.



872
873
874
# File 'lib/mq.rb', line 872

def MQ.method_missing meth, *args, &blk
  MQ.default.__send__(meth, *args, &blk)
end

Instance Method Details

#closeObject

Schedules the request to close the channel to be sent. Actual closing of the channels happens when Protocol::Channel::CloseOk is received from broker.



731
732
733
734
735
736
737
738
739
740
# File 'lib/mq.rb', line 731

def close
  if @deferred_status == :succeeded
    send Protocol::Channel::Close.new(:reply_code => 200,
                                      :reply_text => 'bye',
                                      :method_id => 0,
                                      :class_id => 0)
  else
    @closing = true
  end
end

#connected?Boolean

Tests connection status of associated AMQP connection

Returns:

  • (Boolean)


836
837
838
# File 'lib/mq.rb', line 836

def connected?
  @connection.connected?
end

#consumersObject

Queue objects keyed on their consumer tags.

Not typically called by client code.



810
811
812
# File 'lib/mq.rb', line 810

def consumers
  @consumers ||= {}
end

#direct(name = 'amq.direct', opts = {}) ⇒ Object

Defines, intializes and returns an Exchange to act as an ingress point for all published messages.

Direct

A direct exchange is useful for 1:1 communication between a publisher and subscriber. Messages are routed to the queue with a binding that shares the same name as the exchange. Alternately, the messages are routed to the bound queue that shares the same name as the routing key used for defining the exchange. This exchange type does not honor the :key option when defining a new instance with a name. It will honor the :key option if the exchange name is the empty string. Allocating this exchange without a name or with the empty string will use the internal ‘amq.direct’ exchange.

Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.

# exchange is named 'foo'
exchange = MQ.direct('foo')

# or, the exchange can use the default name (amq.direct) and perform
# routing comparisons using the :key
exchange = MQ.direct("", :key => 'foo')
exchange.publish('some data') # will be delivered to queue bound to 'foo'

queue = MQ.queue('foo')
# can receive data since the queue name and the exchange key match exactly
queue.pop { |data| puts "received data [#{data}]" }

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.

Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.

  • :auto_delete => true | false (default false)

If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

If the exchange has been previously declared, this option is ignored on subsequent declarations.

  • :internal => true | false (default false)

If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Exceptions

Doing any of these activities are illegal and will raise MQ:Error.

  • redeclare an already-declared exchange to a different type

  • :passive => true and the exchange does not exist (NOT_FOUND)



335
336
337
# File 'lib/mq.rb', line 335

def direct name = 'amq.direct', opts = {}
  exchanges[name] ||= Exchange.new(self, :direct, name, opts)
end

#exchangesObject

Returns a hash of all the exchange proxy objects.

Not typically called by client code.



777
778
779
# File 'lib/mq.rb', line 777

def exchanges
  @exchanges ||= {}
end

#fanout(name = 'amq.fanout', opts = {}) ⇒ Object

Defines, intializes and returns an Exchange to act as an ingress point for all published messages.

Fanout

A fanout exchange is useful for 1:N communication where one publisher feeds multiple subscribers. Like direct exchanges, messages published to a fanout exchange are delivered to queues whose name matches the exchange name (or are bound to that exchange name). Each queue gets its own copy of the message.

Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.

Like the direct exchange type, this exchange type does not honor the :key option when defining a new instance with a name. It will honor the :key option if the exchange name is the empty string. Allocating this exchange without a name or with the empty string will use the internal ‘amq.fanout’ exchange.

EM.run do
  clock = MQ.fanout('clock')
  EM.add_periodic_timer(1) do
    puts "\npublishing #{time = Time.now}"
    clock.publish(Marshal.dump(time))
  end

  amq = MQ.queue('every second')
  amq.bind(MQ.fanout('clock')).subscribe do |time|
    puts "every second received #{Marshal.load(time)}"
  end

  # note the string passed to #bind
  MQ.queue('every 5 seconds').bind('clock').subscribe do |time|
    time = Marshal.load(time)
    puts "every 5 seconds received #{time}" if time.strftime('%S').to_i%5 == 0
  end
end

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.

Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.

  • :auto_delete => true | false (default false)

If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

If the exchange has been previously declared, this option is ignored on subsequent declarations.

  • :internal => true | false (default false)

If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Exceptions

Doing any of these activities are illegal and will raise MQ:Error.

  • redeclare an already-declared exchange to a different type

  • :passive => true and the exchange does not exist (NOT_FOUND)



421
422
423
# File 'lib/mq.rb', line 421

def fanout name = 'amq.fanout', opts = {}
  exchanges[name] ||= Exchange.new(self, :fanout, name, opts)
end

#get_queueObject

Yields a (Mutex-synchronized) FIFO queue of consumers that issued Protocol::Basic::Get requests (that is, called Queue#pop)

Not typically called by client code.



792
793
794
795
796
797
798
# File 'lib/mq.rb', line 792

def get_queue
  if block_given?
    (@get_queue_mutex ||= Mutex.new).synchronize {
      yield(@get_queue ||= [])
    }
  end
end

#headers(name = 'amq.match', opts = {}) ⇒ Object

Defines, intializes and returns an Exchange to act as an ingress point for all published messages.

Headers

A headers exchange allows for messages to be published to an exchange

Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.

As part of the AMQP standard, each server should predeclare a headers exchange called ‘amq.match’ (this is not required by the standard). Allocating this exchange without a name or with the empty string will use the internal ‘amq.match’ exchange.

TODO: The classic example is …

When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified (in RabbitMQ at least), it defaults to “all”.

A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).

TODO: document behavior when either the binding or the message is missing

a header present in the other

TODO: insert example

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.

Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.

  • :auto_delete => true | false (default false)

If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

If the exchange has been previously declared, this option is ignored on subsequent declarations.

  • :internal => true | false (default false)

If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Exceptions

Doing any of these activities are illegal and will raise MQ:Error.

  • redeclare an already-declared exchange to a different type

  • :passive => true and the exchange does not exist (NOT_FOUND)

  • using a value other than “any” or “all” for “x-match”



613
614
615
# File 'lib/mq.rb', line 613

def headers name = 'amq.match', opts = {}
  exchanges[name] ||= Exchange.new(self, :headers, name, opts)
end

#prefetch(count) ⇒ Object

Asks the broker to set prefetch_count (size of the prefetch buffer) that the broker will maintain for outstanding unacknowledged messages on a this channel. This is Applications typically set the prefetch count to 1, which means the processing speed of the consumer exerts complete backpressure on the flow of messages in that channel.



757
758
759
760
761
# File 'lib/mq.rb', line 757

def prefetch(count)
  @prefetch_count = count
  send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => count, :global => false)
  self
end

#process_frame(frame) ⇒ Object

May raise a MQ::Error exception when the frame payload contains a Protocol::Channel::Close object.

This usually occurs when a client attempts to perform an illegal operation. A short, and incomplete, list of potential illegal operations follows:

  • publish a message to a deleted exchange (NOT_FOUND)

  • declare an exchange using the reserved ‘amq.’ naming structure (ACCESS_REFUSED)



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/mq.rb', line 162

def process_frame frame
  log :received, frame

  case frame
    when Frame::Header
      @header = frame.payload
      @body = ''
      check_content_completion

    when Frame::Body
      @body << frame.payload
      check_content_completion

    when Frame::Method
      case method = frame.payload
        when Protocol::Channel::OpenOk
          send Protocol::Access::Request.new(:realm => '/data',
                                             :read => true,
                                             :write => true,
                                             :active => true,
                                             :passive => true)

        when Protocol::Access::RequestOk
          @ticket = method.ticket
          callback {
            send Protocol::Channel::Close.new(:reply_code => 200,
                                              :reply_text => 'bye',
                                              :method_id => 0,
                                              :class_id => 0)
          } if @closing
          succeed

        when Protocol::Basic::CancelOk
          if @consumer = consumers[method.consumer_tag]
            @consumer.cancelled
          else
            MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
          end

        when Protocol::Queue::DeclareOk
          queues[method.queue].receive_status method

        when Protocol::Basic::GetOk
          @method = method
          @header = nil
          @body = ''

          @consumer = get_queue { |q| q.shift }
          MQ.error "No pending Basic.GetOk requests" unless @consumer

        when Protocol::Basic::Deliver
          @method = method
          @header = nil
          @body = ''

          @consumer = consumers[method.consumer_tag]
          MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer

        when Protocol::Basic::GetEmpty
          if @consumer = get_queue { |q| q.shift }
            @consumer.receive nil, nil
          else
            MQ.error "Basic.GetEmpty for invalid consumer"
          end

        when Protocol::Channel::Close
          raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"

        when Protocol::Channel::CloseOk
          @closing = false
          conn.callback { |c|
            c.channels.delete @channel
            c.close if c.channels.empty?
          }

        when Protocol::Basic::ConsumeOk
          if @consumer = consumers[method.consumer_tag]
            @consumer.confirm_subscribe
          else
            MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
          end
      end
  end
end

#queue(name, opts = {}) ⇒ Object

Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.

Like an Exchange, queue names starting with ‘amq.’ are reserved for internal use. Attempts to create queue names in violation of this reservation will raise MQ:Error (ACCESS_REFUSED).

It is not supported to create a queue without a name; some string (even the empty string) must be passed in the name parameter.

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue (though it is allowed).

Again, note the durability property on a queue has no influence on the persistence of published messages. A durable queue containing transient messages will flush those messages on a restart.

If the queue has already been declared, any redeclaration will ignore this setting. A queue may only be declared durable the first time when it is created.

  • :exclusive => true | false (default false)

Exclusive queues may only be consumed from by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Only a single consumer is allowed to remove messages from this queue.

The default is a shared queue. Multiple clients may consume messages from this queue.

Attempting to redeclare an already-declared queue as :exclusive => true will raise MQ:Error.

  • :auto_delete = true | false (default false)

If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted.

The server waits for a short period of time before determining the queue is unused to give time to the client code to bind an exchange to it.

If the queue has been previously declared, this option is ignored on subsequent declarations.

Any remaining messages in the queue will be purged when the queue is deleted regardless of the message’s persistence setting.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



683
684
685
686
# File 'lib/mq.rb', line 683

def queue name, opts = {}
  #noinspection RubyArgCount
  queues[name] ||= Queue.new(self, name, opts)
end

#queuesObject

Returns a hash of all the queue proxy objects.

Not typically called by client code.



784
785
786
# File 'lib/mq.rb', line 784

def queues
  @queues ||= {}
end

#recover(requeue = false) ⇒ Object

Asks the broker to redeliver all unacknowledged messages on this channel.

  • :requeue - (default false)

If this parameter is false, the message will be redelivered to the original recipient. If this flag is true, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.



769
770
771
772
# File 'lib/mq.rb', line 769

def recover requeue = false
  send Protocol::Basic::Recover.new(:requeue => requeue)
  self
end

#resetObject

Resets and reinitializes the channel and its queues/exchanges



816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
# File 'lib/mq.rb', line 816

def reset
  @deferred_status = nil
  @channel = nil
  initialize @connection

  @consumers = {}

  exs = @exchanges
  @exchanges = {}
  exs.each { |_, e| e.reset } if exs

  qus = @queues
  @queues = {}
  qus.each { |_, q| q.reset } if qus

  prefetch(@prefetch_count) if @prefetch_count
end

#rpc(name, obj = nil) ⇒ Object

Takes a queue name and optional object.

The optional object may be a class name, module name or object instance. When given a class or module name, the object is instantiated during this setup. The passed queue is automatically subscribed to so it passes all messages (and their arguments) to the object.

Marshalling and unmarshalling the objects is handled internally. This marshalling is subject to the same restrictions as defined in the Marshal standard library. See that documentation for further reference.

When the optional object is not passed, the returned rpc reference is used to send messages and arguments to the queue. See #method_missing which does all of the heavy lifting with the proxy. Some client elsewhere must call this method with the optional block so that there is a valid destination. Failure to do so will just enqueue marshalled messages that are never consumed.

EM.run do
  server = MQ.rpc('hash table node', Hash)

  client = MQ.rpc('hash table node')
  client[:now] = Time.now
  client[:one] = 1

  client.values do |res|
    p 'client', :values => res
  end

  client.keys do |res|
    p 'client', :keys => res
    EM.stop_event_loop
  end
end


724
725
726
# File 'lib/mq.rb', line 724

def rpc name, obj = nil
  rpcs[name] ||= RPC.new(self, name, obj)
end

#rpcsObject

Returns a hash of all rpc proxy objects.

Not typically called by client code.



803
804
805
# File 'lib/mq.rb', line 803

def rpcs
  @rcps ||= {}
end

#send(*args) ⇒ Object

Sends each argument through @connection, setting its ticket property to the @ticket received in most recent Protocol::Access::RequestOk. This operation is Mutex-synchronized.



250
251
252
253
254
255
256
257
258
259
260
# File 'lib/mq.rb', line 250

def send *args
  conn.callback { |c|
    (@_send_mutex ||= Mutex.new).synchronize do
      args.each do |data|
        data.ticket = @ticket if @ticket and data.respond_to? :ticket=
        log :sending, data
        c.send data, :channel => @channel
      end
    end
  }
end

#topic(name = 'amq.topic', opts = {}) ⇒ Object

Defines, intializes and returns an Exchange to act as an ingress point for all published messages.

Topic

A topic exchange allows for messages to be published to an exchange tagged with a specific routing key. The Exchange uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.

This is the only exchange type to honor the key hash key for all cases.

Any published message, regardless of its persistence setting, is thrown away by the exchange when there are no queues bound to it.

As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’ (this is not required by the standard). Allocating this exchange without a name or with the empty string will use the internal ‘amq.topic’ exchange.

The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple Computer would look like:

'stock.us.aapl'

For a foreign stock, it may look like:

'stock.de.dax'

When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.

EM.run do
  exch = MQ.topic("stocks")
  keys = ['stock.us.aapl', 'stock.de.dax']

  EM.add_periodic_timer(1) do # every second
    puts
    exch.publish(10+rand(10), :routing_key => keys[rand(2)])
  end

  # match against one dot-separated item
  MQ.queue('us stocks').bind(exch, :key => 'stock.us.*').subscribe do |price|
    puts "us stock price [#{price}]"
  end

  # match against multiple dot-separated items
  MQ.queue('all stocks').bind(exch, :key => 'stock.#').subscribe do |price|
    puts "all stocks: price [#{price}]"
  end

  # require exact match
  MQ.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price|
    puts "dax price [#{price}]"
  end
end

For matching, the ‘*’ (asterisk) wildcard matches against one dot-separated item only. The ‘#’ wildcard (hash or pound symbol) matches against 0 or more dot-separated items. If none of these symbols are used, the exchange performs a comparison looking for an exact match.

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

A transient exchange (the default) is stored in memory-only. The exchange and all bindings will be lost on a server restart. It makes no sense to publish a persistent message to a transient exchange.

Durable exchanges and their bindings are recreated upon a server restart. Any published messages not routed to a bound queue are lost.

  • :auto_delete => true | false (default false)

If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

If the exchange has been previously declared, this option is ignored on subsequent declarations.

  • :internal => true | false (default false)

If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Exceptions

Doing any of these activities are illegal and will raise MQ:Error.

  • redeclare an already-declared exchange to a different type

  • :passive => true and the exchange does not exist (NOT_FOUND)



533
534
535
# File 'lib/mq.rb', line 533

def topic name = 'amq.topic', opts = {}
  exchanges[name] ||= Exchange.new(self, :topic, name, opts)
end