Class: MarchHare::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/channel.rb

Overview

## Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

## Opening Channels

Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:

This will automatically allocate a channel id.

## Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

## Higher-level API

MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

### Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

### Exchange Operations In Higher-level API

## Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

## Channel IDs

Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.

## Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.

Examples:

conn = MarchHare.new
conn.start

ch   = conn.create_channel

ch  = conn.create_channel
ch.close

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue MarchHare::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "rubymarchhare.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue MarchHare::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Defined Under Namespace

Classes: BlockConfirmListener, BlockReturnListener

Instance Attribute Summary collapse

Exchanges collapse

Queues collapse

basic.* collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, delegate) ⇒ Channel



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/march_hare/channel.rb', line 121

def initialize(session, delegate)
  @connection = session
  @delegate   = delegate

  @exchanges      = JavaConcurrent::ConcurrentHashMap.new
  @queues         = JavaConcurrent::ConcurrentHashMap.new
  # we keep track of consumers in part to gracefully shut down their
  # executors when the channel is closed. This frees library users
  # from having to worry about this. MK.
  @consumers      = JavaConcurrent::ConcurrentHashMap.new
  @shutdown_hooks = Array.new
  @confirm_hooks  = Array.new
  @recoveries_counter = JavaConcurrent::AtomicInteger.new(0)

  # An opt-in setting that instructs the channel to cancel all consumers
  # before closing. This helps reduce the probability of in-flight deliveries
  # right before channel closure.
  @cancel_consumers_before_closing = false

  on_shutdown do |ch, cause|
    ch.gracefully_shut_down_consumers
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



1014
1015
1016
# File 'lib/march_hare/channel.rb', line 1014

def method_missing(selector, *args)
  @delegate.__send__(selector, *args)
end

Instance Attribute Details

#consumersArray<MarchHare::Consumer> (readonly)



118
119
120
# File 'lib/march_hare/channel.rb', line 118

def consumers
  @consumers
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter.



341
342
343
# File 'lib/march_hare/channel.rb', line 341

def recoveries_counter
  @recoveries_counter
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.



802
803
804
805
806
# File 'lib/march_hare/channel.rb', line 802

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

#automatically_recover(session, java_connection) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/march_hare/channel.rb', line 235

def automatically_recover(session, java_connection)
  logger.debug("channel: begin automatic connection recovery")

  jch = java_connection.create_channel(id)

  self.revive_with(jch)
  self.recover_shutdown_hooks

  self.recover_prefetch_setting
  self.recover_confirm_mode
  self.recover_confirm_hooks
  self.recover_tx_mode
  self.recover_exchanges
  # # this includes bindings recovery
  self.recover_queues
  self.recover_consumers
  self.increment_recoveries_counter
end

#basic_ack(delivery_tag, multiple) ⇒ NilClass

Acknowledges one or more messages (deliveries).



888
889
890
891
892
# File 'lib/march_hare/channel.rb', line 888

def basic_ack(delivery_tag, multiple)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_ack(delivery_tag.to_i, multiple)
  end
end

#basic_cancel(consumer_tag) ⇒ Object



761
762
763
764
765
766
# File 'lib/march_hare/channel.rb', line 761

def basic_cancel(consumer_tag)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_cancel(consumer_tag)
  end
  self.unregister_consumer(consumer_tag)
end

#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object



747
748
749
750
751
752
753
754
755
756
757
758
759
# File 'lib/march_hare/channel.rb', line 747

def basic_consume(queue, auto_ack, consumer_tag = nil, consumer)
  consumer.auto_ack = auto_ack
  tag = converting_rjc_exceptions_to_ruby do
    if consumer_tag
      @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer)
    else
      @delegate.basic_consume(queue, auto_ack, consumer)
    end
  end
  self.register_consumer(tag, consumer)

  tag
end

#basic_get(queue, auto_ack) ⇒ Object



741
742
743
744
745
# File 'lib/march_hare/channel.rb', line 741

def basic_get(queue, auto_ack)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_get(queue, auto_ack)
  end
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.



904
905
906
907
908
# File 'lib/march_hare/channel.rb', line 904

def basic_nack(delivery_tag, multiple = false, requeue = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Options Hash (properties):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID



735
736
737
738
739
# File 'lib/march_hare/channel.rb', line 735

def basic_publish(exchange, routing_key, mandatory, properties, body)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body)
  end
end

#basic_qos(prefetch_count) ⇒ Object



768
769
770
771
772
773
774
775
# File 'lib/march_hare/channel.rb', line 768

def basic_qos(prefetch_count)
  r = converting_rjc_exceptions_to_ruby do
    @delegate.basic_qos(prefetch_count)
  end
  @prefetch_count = prefetch_count

  r
end

#basic_recover(requeue = true) ⇒ Object

Redeliver unacknowledged messages



914
915
916
917
918
# File 'lib/march_hare/channel.rb', line 914

def basic_recover(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover(requeue)
  end
end

#basic_recover_async(requeue = true) ⇒ Object

Redeliver unacknowledged messages



924
925
926
927
928
# File 'lib/march_hare/channel.rb', line 924

def basic_recover_async(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover_async(requeue)
  end
end

#basic_reject(delivery_tag, requeue) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

See Also:



875
876
877
878
879
# File 'lib/march_hare/channel.rb', line 875

def basic_reject(delivery_tag, requeue)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_reject(delivery_tag.to_i, requeue)
  end
end

#cancel_consumers_before_closing!Object



214
215
216
# File 'lib/march_hare/channel.rb', line 214

def cancel_consumers_before_closing!
  @cancel_consumers_before_closing = true
end

#cancel_consumers_before_closing?Boolean



218
219
220
# File 'lib/march_hare/channel.rb', line 218

def cancel_consumers_before_closing?
  !!@cancel_consumers_before_closing
end

#channel_numberInteger Also known as: id, number



158
159
160
# File 'lib/march_hare/channel.rb', line 158

def channel_number
  @delegate.channel_number
end

#close(code = 200, reason = "Goodbye") ⇒ Object

Closes the channel.

Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/march_hare/channel.rb', line 178

def close(code = 200, reason = "Goodbye")
  # This is a best-effort attempt to cancel all consumers before closing the channel.
  # Retries are extremely unlikely to succeed, and the channel itself is about to be closed,
  # so we don't bother retrying.
  if self.cancel_consumers_before_closing?
   # cancelling a consumer involves using the same mutex, so avoid holding the lock
    keys = @consumers.keys
    keys.each do |ctag|
      begin
        self.basic_cancel(ctag)
      rescue Bunny::Exception
        # ignore
      rescue Bunny::ClientTimeout
        # ignore
      end
    end
  end

  v = @delegate.close(code, reason)

  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end

  @connection.unregister_channel(self)

  v
end

#closed?Boolean



169
170
171
# File 'lib/march_hare/channel.rb', line 169

def closed?
  !open?
end

#configure(&block) ⇒ Object



208
209
210
211
212
# File 'lib/march_hare/channel.rb', line 208

def configure(&block)
  block.call(self) if block_given?

  self
end

#confirm_selectNilClass

Enables publisher confirms on the channel.



937
938
939
940
941
942
# File 'lib/march_hare/channel.rb', line 937

def confirm_select
  converting_rjc_exceptions_to_ruby do
    @confirm_mode = true
    @delegate.confirm_select
  end
end

#converting_rjc_exceptions_to_ruby(&block) ⇒ Object

Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.



1119
1120
1121
1122
1123
1124
1125
# File 'lib/march_hare/channel.rb', line 1119

def converting_rjc_exceptions_to_ruby(&block)
  begin
    block.call
  rescue Exception, java.lang.Throwable => e
    Exceptions.convert_and_reraise(e)
  end
end

#default_exchangeObject

Provides access to the default exchange



453
454
455
# File 'lib/march_hare/channel.rb', line 453

def default_exchange
  @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct")
end

#deregister_exchange(exchange) ⇒ Object



1085
1086
1087
1088
# File 'lib/march_hare/channel.rb', line 1085

def deregister_exchange(exchange)
  logger.debug("channel: deregister exchange #{exchange.name}")
  @exchanges.delete(exchange.name)
end

#deregister_queue(queue) ⇒ Object



1063
1064
1065
1066
# File 'lib/march_hare/channel.rb', line 1063

def deregister_queue(queue)
  logger.debug("channel: deregister queue #{queue.name}")
  @queues.delete(queue.name)
end

#deregister_queue_named(name) ⇒ Object



1069
1070
1071
# File 'lib/march_hare/channel.rb', line 1069

def deregister_queue_named(name)
  @queues.delete(name)
end

#direct(name, opts = {}) ⇒ MarchHare::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



401
402
403
404
405
406
407
# File 'lib/march_hare/channel.rb', line 401

def direct(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#durable_queue(name, type = Queue::Types::CLASSIC, opts = {}) ⇒ MarchHare::Queue

Declares a new server-named queue that is automatically deleted when the connection is closed.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

See Also:



585
586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/march_hare/channel.rb', line 585

def durable_queue(name, type = Queue::Types::CLASSIC, opts = {})
  throw ArgumentError.new("queue name must not be nil") if name.nil?
  throw ArgumentError.new("queue name must not be empty (server-named durable queues do not make sense)") if name.empty?

  final_opts = opts.merge({
    :type        => type,
    :durable     => true,
    # exclusive or auto-delete QQs do not make much sense
    :exclusive   => false,
    :auto_delete => false
  })

  self.queue(name, final_opts)
end

#exchange(name, options = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (options):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or “x-consistent-hash”

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:



359
360
361
362
363
364
365
# File 'lib/march_hare/channel.rb', line 359

def exchange(name, options={})
  dx = Exchange.new(self, name, options).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object

Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)



485
486
487
488
489
# File 'lib/march_hare/channel.rb', line 485

def exchange_bind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_bind(destination, source, routing_key, arguments)
  end
end

#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object

Declares a echange using echange.declare AMQP 0.9.1 method.



468
469
470
471
472
# File 'lib/march_hare/channel.rb', line 468

def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments)
  end
end

#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object

Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)



502
503
504
505
506
# File 'lib/march_hare/channel.rb', line 502

def exchange_unbind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_unbind(destination, source, routing_key, arguments)
  end
end

#fanout(name, opts = {}) ⇒ MarchHare::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



380
381
382
383
384
385
386
# File 'lib/march_hare/channel.rb', line 380

def fanout(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#find_queue(name) ⇒ Object



1080
1081
1082
# File 'lib/march_hare/channel.rb', line 1080

def find_queue(name)
  @queues[name]
end

#gracefully_shut_down_consumersObject



1109
1110
1111
1112
1113
# File 'lib/march_hare/channel.rb', line 1109

def gracefully_shut_down_consumers
  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end
end

#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object



1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
# File 'lib/march_hare/channel.rb', line 1128

def guarding_against_stale_delivery_tags(tag, &block)
  case tag
  # if a fixnum was passed, execute unconditionally. MK.
  when Integer then
    block.call
    # versioned delivery tags should be checked to avoid
    # sending out stale (invalid) tags after channel was reopened
    # during network failure recovery. MK.
  when VersionedDeliveryTag then
    if !tag.stale?(@recoveries_counter.get)
      block.call
    end
  end
end

#headers(name, opts = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:



443
444
445
446
447
448
449
# File 'lib/march_hare/channel.rb', line 443

def headers(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#increment_recoveries_counterObject



337
338
339
# File 'lib/march_hare/channel.rb', line 337

def increment_recoveries_counter
  @recoveries_counter.increment_and_get
end

#logger::Logger



153
154
155
# File 'lib/march_hare/channel.rb', line 153

def logger
  @connection.logger
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.



832
833
834
835
836
# File 'lib/march_hare/channel.rb', line 832

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#next_publisher_seq_noObject



968
969
970
# File 'lib/march_hare/channel.rb', line 968

def next_publisher_seq_no
  @delegate.next_publisher_seq_no
end

#on_confirm(&block) ⇒ Object

Defines a publisher confirm handler



1008
1009
1010
1011
1012
# File 'lib/march_hare/channel.rb', line 1008

def on_confirm(&block)
  ch = BlockConfirmListener.from(block)
  self.add_confirm_listener(ch)
  @confirm_hooks << ch
end

#on_return(&block) ⇒ Object

Defines a returned message handler.



1002
1003
1004
# File 'lib/march_hare/channel.rb', line 1002

def on_return(&block)
  self.add_return_listener(BlockReturnListener.from(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.



225
226
227
228
229
230
231
232
# File 'lib/march_hare/channel.rb', line 225

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)

  @shutdown_hooks << sh
  @delegate.add_shutdown_listener(sh)

  sh
end

#open?Boolean



165
166
167
# File 'lib/march_hare/channel.rb', line 165

def open?
  @delegate.open?
end

#prefetchInteger



792
793
794
# File 'lib/march_hare/channel.rb', line 792

def prefetch
  @prefetch_count || 0
end

#prefetch=(n) ⇒ Object

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages



787
788
789
# File 'lib/march_hare/channel.rb', line 787

def prefetch=(n)
  basic_qos(n)
end

#qos(options = {}) ⇒ Object



777
778
779
# File 'lib/march_hare/channel.rb', line 777

def qos(options={})
  basic_qos(options.fetch(:prefetch_count, 0))
end

#queue(name, options = {}) ⇒ MarchHare::Queue

Declares a queue or looks it up in the per-channel cache.

Options Hash (options):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



526
527
528
529
530
531
532
# File 'lib/march_hare/channel.rb', line 526

def queue(name, options = {})
  dq = Queue.new(self, name, options).tap do |q|
    q.declare!
  end

  self.register_queue(dq)
end

#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method



672
673
674
675
676
# File 'lib/march_hare/channel.rb', line 672

def queue_bind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_bind(queue, exchange, routing_key, arguments)
  end
end

#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object

Declares a queue using queue.declare AMQP 0.9.1 method.



628
629
630
631
632
# File 'lib/march_hare/channel.rb', line 628

def queue_declare(name, durable, exclusive, auto_delete, arguments = {})
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments)
  end
end

#queue_declare_passive(name) ⇒ Object

Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.



640
641
642
643
644
# File 'lib/march_hare/channel.rb', line 640

def queue_declare_passive(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare_passive(name)
  end
end

#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object

Deletes a queue using queue.delete AMQP 0.9.1 method



655
656
657
658
659
# File 'lib/march_hare/channel.rb', line 655

def queue_delete(name, if_empty = false, if_unused = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_delete(name, if_empty, if_unused)
  end
end

#queue_purge(name) ⇒ Object

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.



701
702
703
704
705
# File 'lib/march_hare/channel.rb', line 701

def queue_purge(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_purge(name)
  end
end

#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method



689
690
691
692
693
# File 'lib/march_hare/channel.rb', line 689

def queue_unbind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_unbind(queue, exchange, routing_key, arguments)
  end
end

#quorum_queue(name, opts = {}) ⇒ MarchHare::Queue

Declares a new client-named quorum queue.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

See Also:



545
546
547
548
549
550
# File 'lib/march_hare/channel.rb', line 545

def quorum_queue(name, opts = {})
  throw ArgumentError.new("quorum queue name must not be nil") if name.nil?
  throw ArgumentError.new("quorum queue name must not be empty (server-named QQs do not make sense)") if name.empty?

  durable_queue(name, Queue::Types::QUORUM, opts)
end

#recover_confirm_hooksObject



267
268
269
270
271
# File 'lib/march_hare/channel.rb', line 267

def recover_confirm_hooks
  @confirm_hooks.each do |ch|
    @delegate.add_confirm_listener(ch)
  end
end

#recover_confirm_modeObject

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.



282
283
284
# File 'lib/march_hare/channel.rb', line 282

def recover_confirm_mode
  confirm_select if defined?(@confirm_mode) && @confirm_mode
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/march_hare/channel.rb', line 323

def recover_consumers
  @consumers.values.each do |c|
    begin
      logger.debug("channel: recover consumer #{c.consumer_tag}")
      self.unregister_consumer(c.consumer_tag)
      c.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering consumer #{c.consumer_tag}")
      logger.error(e)
    end
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



295
296
297
298
299
300
301
302
303
304
305
# File 'lib/march_hare/channel.rb', line 295

def recover_exchanges
  @exchanges.values.each do |x|
    begin
      logger.debug("channel: recover exchange #{x.name}")
      x.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering exchange #{x.name}")
      logger.error(e)
    end
  end
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



276
277
278
# File 'lib/march_hare/channel.rb', line 276

def recover_prefetch_setting
  basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



309
310
311
312
313
314
315
316
317
318
319
# File 'lib/march_hare/channel.rb', line 309

def recover_queues
  @queues.values.each do |q|
    begin
      logger.debug("channel: recover queue #{q.name}")
      q.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering queue #{q.name}")
      logger.error(e)
    end
  end
end

#recover_shutdown_hooksObject



260
261
262
263
264
# File 'lib/march_hare/channel.rb', line 260

def recover_shutdown_hooks
  @shutdown_hooks.each do |sh|
    @delegate.add_shutdown_listener(sh)
  end
end

#recover_tx_modeObject

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.



288
289
290
# File 'lib/march_hare/channel.rb', line 288

def recover_tx_mode
  tx_select if defined?(@tx_mode) && @tx_mode
end

#register_consumer(consumer_tag, consumer) ⇒ Object



1097
1098
1099
1100
# File 'lib/march_hare/channel.rb', line 1097

def register_consumer(consumer_tag, consumer)
  logger.debug("channel: register consumer #{consumer_tag}")
  @consumers[consumer_tag] = consumer
end

#register_exchange(exchange) ⇒ Object



1091
1092
1093
1094
# File 'lib/march_hare/channel.rb', line 1091

def register_exchange(exchange)
  logger.debug("channel: register exchange #{exchange.name}")
  @exchanges[exchange.name] = exchange
end

#register_queue(queue) ⇒ Object



1074
1075
1076
1077
# File 'lib/march_hare/channel.rb', line 1074

def register_queue(queue)
  logger.debug("channel: register queue #{queue.name}")
  @queues[queue.name] = queue
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.



817
818
819
820
821
# File 'lib/march_hare/channel.rb', line 817

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

#revive_with(java_ch) ⇒ Object



255
256
257
# File 'lib/march_hare/channel.rb', line 255

def revive_with(java_ch)
  @delegate = java_ch
end

#sessionMarchHare::Session Also known as: client, connection



146
147
148
# File 'lib/march_hare/channel.rb', line 146

def session
  @connection
end

#stream(name, opts = {}) ⇒ MarchHare::Queue

Declares a new client-named stream (that Bunny can use as if it was a queue). Note that Bunny would still use AMQP 0-9-1 to perform operations on this “queue”. To use stream-specific operations and to gain from stream protocol efficiency and partitioning, use a Ruby client for the RabbitMQ stream protocol.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

See Also:



567
568
569
570
571
572
# File 'lib/march_hare/channel.rb', line 567

def stream(name, opts = {})
  throw ArgumentError.new("stream name must not be nil") if name.nil?
  throw ArgumentError.new("stream name must not be empty (server-named QQs do not make sense)") if name.empty?

  durable_queue(name, Queue::Types::STREAM, opts)
end

#temporary_queue(opts = {}) ⇒ MarchHare::Queue

Declares a new server-named queue that is automatically deleted when the connection is closed.

See Also:



606
607
608
609
610
611
# File 'lib/march_hare/channel.rb', line 606

def temporary_queue(opts = {})
  temporary_queue_opts = {
    :exclusive => true
  }
  queue("", opts.merge(temporary_queue_opts))
end

#topic(name, opts = {}) ⇒ MarchHare::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



422
423
424
425
426
427
428
# File 'lib/march_hare/channel.rb', line 422

def topic(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#tx_commitObject

Commits a transaction



987
988
989
990
991
# File 'lib/march_hare/channel.rb', line 987

def tx_commit
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_commit
  end
end

#tx_rollbackObject

Rolls back a transaction



994
995
996
997
998
# File 'lib/march_hare/channel.rb', line 994

def tx_rollback
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_rollback
  end
end

#tx_selectObject

Enables transactions on the channel



973
974
975
976
977
978
# File 'lib/march_hare/channel.rb', line 973

def tx_select
  converting_rjc_exceptions_to_ruby do
    @tx_mode = true
    @delegate.tx_select
  end
end

#unregister_consumer(consumer_tag) ⇒ Object



1103
1104
1105
1106
# File 'lib/march_hare/channel.rb', line 1103

def unregister_consumer(consumer_tag)
  logger.debug("channel: unregister consumer #{consumer_tag}")
  @consumers.delete(consumer_tag)
end

#using_publisher_confirms?Boolean Also known as: uses_publisher_confirms?



945
946
947
# File 'lib/march_hare/channel.rb', line 945

def using_publisher_confirms?
  !!@confirm_mode
end

#using_tx?Boolean Also known as: uses_tx?



981
982
983
# File 'lib/march_hare/channel.rb', line 981

def using_tx?
  !!@tx_mode
end

#wait_for_confirms(timeout = nil) ⇒ Boolean

Waits until all outstanding publisher confirms arrive.

Takes an optional timeout in milliseconds. Will raise an exception in case a timeout has occured.



958
959
960
961
962
963
964
965
966
# File 'lib/march_hare/channel.rb', line 958

def wait_for_confirms(timeout = nil)
  if timeout
    converting_rjc_exceptions_to_ruby do
      @delegate.wait_for_confirms(timeout)
    end
  else
    @delegate.wait_for_confirms
  end
end