Class: Bunny::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/channel.rb

Overview

Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening Channels

Channels can be opened either via Bunny::Session#create_channel (sufficient in the majority of cases) or by instantiating Bunny::Channel directly:

conn = Bunny.new
conn.start

ch   = conn.create_channel

This will automatically allocate a channel id.

Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

ch = conn.create_channel
ch.close

Higher-level API

Bunny offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

Exchange Operations In Higher-level API

Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

Channel IDs

Channels are identified by their ids which are integers. Bunny takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is not a problem.

Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With Bunny, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying channel.close method information.

Examples:

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue Bunny::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "bunny.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue Bunny::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Constant Summary

DEFAULT_CONTENT_TYPE =
"application/octet-stream".freeze
SHORTSTR_LIMIT =
255

Instance Attribute Summary (collapse)

Backwards compatibility with 0.8.0 (collapse)

Higher-level API for exchange operations (collapse)

Higher-level API for queue operations (collapse)

QoS and Flow Control (collapse)

Message acknowledgements (collapse)

Consumer and Message operations (basic.*) (collapse)

Queue operations (queue.*) (collapse)

Exchange operations (exchange.*) (collapse)

Flow control (channel.*) (collapse)

Transactions (tx.*) (collapse)

Publisher Confirms (confirm.*) (collapse)

Misc (collapse)

Network Failure Recovery (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Channel) initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/bunny/channel.rb', line 166

def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
  @connection = connection
  @logger     = connection.logger
  @id         = id || @connection.next_channel_id
  @status     = :opening

  @connection.register_channel(self)

  @queues     = Hash.new
  @exchanges  = Hash.new
  @consumers  = Hash.new
  @work_pool  = work_pool

  # synchronizes frameset delivery. MK.
  @publishing_mutex = @connection.mutex_impl.new
  @consumer_mutex   = @connection.mutex_impl.new

  @unconfirmed_set_mutex = @connection.mutex_impl.new

  self.reset_continuations

  # threads awaiting on continuations. Used to unblock
  # them when network connection goes down so that busy loops
  # that perform synchronous operations can work. MK.
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @next_publish_seq_no = 0
  @delivery_tag_offset = 0

  @recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0)
  @uncaught_exception_handler = Proc.new do |e, consumer|
    @logger.error "Uncaught exception from consumer #{consumer.to_s}: #{e.inspect} @ #{e.backtrace[0]}"
  end
end

Instance Attribute Details

- (Bunny::Session) connection (readonly)



139
140
141
# File 'lib/bunny/channel.rb', line 139

def connection
  @connection
end

- (Hash<String, Bunny::Consumer>) consumers (readonly)



155
156
157
# File 'lib/bunny/channel.rb', line 155

def consumers
  @consumers
end

- (Hash<String, Bunny::Exchange>) exchanges (readonly)



149
150
151
# File 'lib/bunny/channel.rb', line 149

def exchanges
  @exchanges
end

- (Integer) id



137
138
139
# File 'lib/bunny/channel.rb', line 137

def id
  @id
end

- (Set<Integer>) nacked_set (readonly)



153
154
155
# File 'lib/bunny/channel.rb', line 153

def nacked_set
  @nacked_set
end

- (Integer) next_publish_seq_no (readonly)



145
146
147
# File 'lib/bunny/channel.rb', line 145

def next_publish_seq_no
  @next_publish_seq_no
end

- (Integer) prefetch_count (readonly)



158
159
160
# File 'lib/bunny/channel.rb', line 158

def prefetch_count
  @prefetch_count
end

- (Hash<String, Bunny::Queue>) queues (readonly)



147
148
149
# File 'lib/bunny/channel.rb', line 147

def queues
  @queues
end

- (Object) recoveries_counter (readonly)

Returns the value of attribute recoveries_counter



203
204
205
# File 'lib/bunny/channel.rb', line 203

def recoveries_counter
  @recoveries_counter
end

- (Symbol) status (readonly)



141
142
143
# File 'lib/bunny/channel.rb', line 141

def status
  @status
end

- (Set<Integer>) unconfirmed_set (readonly)



151
152
153
# File 'lib/bunny/channel.rb', line 151

def unconfirmed_set
  @unconfirmed_set
end

- (Bunny::ConsumerWorkPool) work_pool (readonly)



143
144
145
# File 'lib/bunny/channel.rb', line 143

def work_pool
  @work_pool
end

Instance Method Details

- (Object) ack(delivery_tag, multiple = false) Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.



474
475
476
477
478
# File 'lib/bunny/channel.rb', line 474

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

- (Boolean) active



259
260
261
# File 'lib/bunny/channel.rb', line 259

def active
  open?
end

- (Boolean) any_consumers?

Returns true if there are consumers on this channel



949
950
951
# File 'lib/bunny/channel.rb', line 949

def any_consumers?
  @consumer_mutex.synchronize { @consumers.any? }
end

- (NilClass) basic_ack(delivery_tag, multiple)

Acknowledges a delivery (message).

Examples:

Ack a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_ack(delivery_info.delivery_tag)
end

Ack a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_ack(delivery_info.delivery_tag)

Ack multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# ack all fetched messages up to payload3
ch.basic_ack(delivery_info.delivery_tag, true)

See Also:



744
745
746
747
748
749
# File 'lib/bunny/channel.rb', line 744

def basic_ack(delivery_tag, multiple)
  raise_if_no_longer_open!
  @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))

  nil
end

- (AMQ::Protocol::Basic::CancelOk) basic_cancel(consumer_tag)

Removes a consumer. Messages for this consumer will no longer be delivered. If the queue it was on is auto-deleted and this consumer was the last one, the queue will be deleted.



935
936
937
938
939
940
941
942
943
944
945
# File 'lib/bunny/channel.rb', line 935

def basic_cancel(consumer_tag)
  @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_cancel_ok = wait_on_continuations
  end

  maybe_kill_consumer_work_pool! unless any_consumers?

  @last_basic_cancel_ok
end

- (AMQ::Protocol::Basic::ConsumeOk) basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block) Also known as: consume

Registers a consumer for queue. Delivered messages will be handled with the block provided to this method.



827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
# File 'lib/bunny/channel.rb', line 827

def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  queue_name = if queue.respond_to?(:name)
                 queue.name
               else
                 queue
               end

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      queue_name,
      consumer_tag,
      false,
      no_ack,
      exclusive,
      false,
      arguments))

  begin
    Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)

  @last_basic_consume_ok
end

- (AMQ::Protocol::Basic::ConsumeOk) basic_consume_with(consumer) Also known as: consume_with

Registers a consumer for queue as Bunny::Consumer instance.



883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
# File 'lib/bunny/channel.rb', line 883

def basic_consume_with(consumer)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    register_consumer(consumer.consumer_tag, consumer)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      consumer.queue_name,
      consumer.consumer_tag,
      false,
      consumer.no_ack,
      consumer.exclusive,
      false,
      consumer.arguments))

  begin
    Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  register_consumer(@last_basic_consume_ok.consumer_tag, consumer)

  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_consume_ok
end

- (Array) basic_get(queue, opts = {:manual_ack => true})

Synchronously fetches a message from the queue, if there are any. This method is for cases when the convenience of synchronous operations is more important than throughput.

Examples:

Using Bunny::Channel#basic_get with manual acknowledgements

conn = Bunny.new
conn.start
ch   = conn.create_channel
# here we assume the queue already exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :manual_ack => true)
ch.acknowledge(delivery_info.delivery_tag)

Options Hash (opts):

  • :ack (Boolean) — default: true

    [DEPRECATED] Use :manual_ack instead

  • :manual_ack (Boolean) — default: true

    Will this message be acknowledged manually?

See Also:



591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
# File 'lib/bunny/channel.rb', line 591

def basic_get(queue, opts = {:manual_ack => true})
  raise_if_no_longer_open!

  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:manual_ack])))
  # this is a workaround for the edge case when basic_get is called in a tight loop
  # and network goes down we need to perform recovery. The problem is, basic_get will
  # keep blocking the thread that calls it without clear way to constantly unblock it
  # from the network activity loop (where recovery happens) with the current continuations
  # implementation (and even more correct and convenient ones, such as wait/notify, should
  # we implement them). So we return a triple of nils immediately which apps should be
  # able to handle anyway as "got no message, no need to act". MK.
  @last_basic_get_response = if @connection.open?
                               wait_on_basic_get_continuations
                             else
                               [nil, nil, nil]
                             end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_basic_get_response
end

- (NilClass) basic_nack(delivery_tag, multiple = false, requeue = false)

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag, false, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_nack(delivery_info.delivery_tag, false, true)

Requeue multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# requeue all fetched messages up to payload3
ch.basic_nack(delivery_info.delivery_tag, true, true)

See Also:



804
805
806
807
808
809
810
811
812
# File 'lib/bunny/channel.rb', line 804

def basic_nack(delivery_tag, multiple = false, requeue = false)
  raise_if_no_longer_open!
  @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
      delivery_tag,
      multiple,
      requeue))

  nil
end

- (Bunny::Channel) basic_publish(payload, exchange, routing_key, opts = {})

Publishes a message using basic.publish AMQP 0.9.1 method.

Options Hash (opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Raises:

  • (ArgumentError)


529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# File 'lib/bunny/channel.rb', line 529

def basic_publish(payload, exchange, routing_key, opts = {})
  raise_if_no_longer_open!
  raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  mode = if opts.fetch(:persistent, true)
           2
         else
           1
         end

  opts[:delivery_mode] ||= mode
  opts[:content_type]  ||= DEFAULT_CONTENT_TYPE
  opts[:priority]      ||= 0

  if @next_publish_seq_no > 0
    @unconfirmed_set_mutex.synchronize do
      @unconfirmed_set.add(@next_publish_seq_no)
      @next_publish_seq_no += 1
    end
  end

  frames = AMQ::Protocol::Basic::Publish.encode(@id,
    payload,
    opts,
    exchange_name,
    routing_key,
    opts[:mandatory],
    false,
    @connection.frame_max)
  @connection.send_frameset_without_timeout(frames, self)

  self
end

- (AMQ::Protocol::Basic::QosOk) basic_qos(count, global = false)

Controls message delivery rate using basic.qos AMQP 0.9.1 method.

Raises:

  • (ArgumentError)

See Also:



626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
# File 'lib/bunny/channel.rb', line 626

def basic_qos(count, global = false)
  raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if count < 0
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, count, global))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_qos_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @prefetch_count = count

  @last_basic_qos_ok
end

- (AMQ::Protocol::Basic::RecoverOk) basic_recover(requeue)

Redeliver unacknowledged messages



647
648
649
650
651
652
653
654
655
656
657
# File 'lib/bunny/channel.rb', line 647

def basic_recover(requeue)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_recover_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_recover_ok
end

- (NilClass) basic_reject(delivery_tag, requeue)

Rejects or requeues a message.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

See Also:



697
698
699
700
701
702
# File 'lib/bunny/channel.rb', line 697

def basic_reject(delivery_tag, requeue)
  raise_if_no_longer_open!
  @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))

  nil
end

- (AMQ::Protocol::Channel::FlowOk) channel_flow(active)

Note:

Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects that consumers do not keep up with them.

Enables or disables message flow for the channel. When message flow is disabled, no new messages will be delivered to consumers on this channel. This is typically used by consumers that cannot keep up with the influx of messages.



1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
# File 'lib/bunny/channel.rb', line 1279

def channel_flow(active)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_channel_flow_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_channel_flow_ok
end

- (Bunny::Session) client



264
265
266
# File 'lib/bunny/channel.rb', line 264

def client
  @connection
end

- (Object) close

Closes the channel. Closed channels can no longer be used (this includes associated Queue, Exchange and Bunny::Consumer instances.



230
231
232
233
234
# File 'lib/bunny/channel.rb', line 230

def close
  @connection.close_channel(self)
  closed!
  maybe_kill_consumer_work_pool!
end

- (Boolean) closed?

Returns true if this channel is closed (manually or because of an exception), false otherwise



244
245
246
# File 'lib/bunny/channel.rb', line 244

def closed?
  @status == :closed
end

- (AMQ::Protocol::Confirm::SelectOk) confirm_select(callback = nil)

Enables publisher confirms for the channel.



1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
# File 'lib/bunny/channel.rb', line 1368

def confirm_select(callback=nil)
  raise_if_no_longer_open!

  if @next_publish_seq_no == 0
    @confirms_continuations = new_continuation
    @unconfirmed_set        = Set.new
    @nacked_set             = Set.new
    @next_publish_seq_no    = 1
    @only_acks_received = true
  end

  @confirms_callback = callback

  @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_confirm_select_ok = wait_on_continuations
  end
  @confirm_mode = true
  raise_if_continuation_resulted_in_a_channel_error!
  @last_confirm_select_ok
end

- (Object) default_exchange

Provides access to the default exchange



357
358
359
# File 'lib/bunny/channel.rb', line 357

def default_exchange
  self.direct(AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

- (Bunny::Exchange) direct(name, opts = {})

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



314
315
316
# File 'lib/bunny/channel.rb', line 314

def direct(name, opts = {})
  Exchange.new(self, :direct, name, opts)
end

- (Bunny::Exchange) exchange(name, opts = {})

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or "x-consistent-hash"

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:



375
376
377
# File 'lib/bunny/channel.rb', line 375

def exchange(name, opts = {})
  Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end

- (AMQ::Protocol::Exchange::BindOk) exchange_bind(source, destination, opts = {})

Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

See Also:



1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
# File 'lib/bunny/channel.rb', line 1190

def exchange_bind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_bind_ok
end

- (AMQ::Protocol::Exchange::DeclareOk) exchange_declare(name, type, opts = {})

Declares a echange using echange.declare AMQP 0.9.1 method.

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this echange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) — default: false

    Should this echange be deleted when it is no longer used?

  • passive (Boolean) — default: false

    If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

See Also:



1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
# File 'lib/bunny/channel.rb', line 1130

def exchange_declare(name, type, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id,
      name,
      type.to_s,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:auto_delete, false),
      opts.fetch(:internal, false),
      false, # nowait
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_declare_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_declare_ok
end

- (AMQ::Protocol::Exchange::DeleteOk) exchange_delete(name, opts = {})

Deletes a exchange using exchange.delete AMQP 0.9.1 method

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

See Also:



1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
# File 'lib/bunny/channel.rb', line 1160

def exchange_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id,
      name,
      opts[:if_unused],
      false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_delete_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_delete_ok
end

- (AMQ::Protocol::Exchange::UnbindOk) exchange_unbind(source, destination, opts = {})

Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

See Also:



1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
# File 'lib/bunny/channel.rb', line 1234

def exchange_unbind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_unbind_ok
end

- (Bunny::Exchange) fanout(name, opts = {})

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



296
297
298
# File 'lib/bunny/channel.rb', line 296

def fanout(name, opts = {})
  Exchange.new(self, :fanout, name, opts)
end

- (Object) flow(active)

Flow control. When set to false, RabbitMQ will stop delivering messages on this channel.



435
436
437
# File 'lib/bunny/channel.rb', line 435

def flow(active)
  channel_flow(active)
end

- (String) generate_consumer_tag(name = "bunny")

Unique string supposed to be used as a consumer tag.



1420
1421
1422
# File 'lib/bunny/channel.rb', line 1420

def generate_consumer_tag(name = "bunny")
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

- (Bunny::Exchange) headers(name, opts = {})

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:



350
351
352
# File 'lib/bunny/channel.rb', line 350

def headers(name, opts = {})
  Exchange.new(self, :headers, name, opts)
end

- (Object) nack(delivery_tag, multiple = false, requeue = false)

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.



491
492
493
494
495
# File 'lib/bunny/channel.rb', line 491

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

- (Integer) number



254
255
256
# File 'lib/bunny/channel.rb', line 254

def number
  self.id
end

- (Object) on_error(&block)

Defines a handler for errors that are not responses to a particular operations (e.g. basic.ack, basic.reject, basic.nack).



1435
1436
1437
# File 'lib/bunny/channel.rb', line 1435

def on_error(&block)
  @on_error = block
end

- (Object) on_uncaught_exception(&block)

Defines a handler for uncaught exceptions in consumers (e.g. delivered message handlers).



1443
1444
1445
# File 'lib/bunny/channel.rb', line 1443

def on_uncaught_exception(&block)
  @uncaught_exception_handler = block
end

- (Bunny::Channel) open

Opens the channel and resets its internal state



213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/bunny/channel.rb', line 213

def open
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @connection.open_channel(self)
  # clear last channel error
  @last_channel_error = nil

  @status = :open

  self
end

- (Boolean) open?

Returns true if this channel is open, false otherwise



238
239
240
# File 'lib/bunny/channel.rb', line 238

def open?
  @status == :open
end

- (Object) prefetch(count)

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages



426
427
428
# File 'lib/bunny/channel.rb', line 426

def prefetch(count)
  self.basic_qos(count, false)
end

- (Bunny::Queue) queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})

Declares a queue or looks it up in the per-channel cache.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



398
399
400
401
402
# File 'lib/bunny/channel.rb', line 398

def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
  q = find_queue(name) || Bunny::Queue.new(self, name, opts)

  register_queue(q)
end

- (AMQ::Protocol::Queue::BindOk) queue_bind(name, exchange, opts = {})

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

See Also:



1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/bunny/channel.rb', line 1053

def queue_bind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id,
      name,
      exchange_name,
      (opts[:routing_key] || opts[:key]),
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_bind_ok
end

- (AMQ::Protocol::Queue::DeclareOk) queue_declare(name, opts = {})

Declares a queue using queue.declare AMQP 0.9.1 method.

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean) — default: false

    Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean) — default: false

    Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean) — default: false

    If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

See Also:



975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
# File 'lib/bunny/channel.rb', line 975

def queue_declare(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id,
      name,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:exclusive, false),
      opts.fetch(:auto_delete, false),
      false,
      opts[:arguments]))
  @last_queue_declare_ok = wait_on_continuations

  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_declare_ok
end

- (AMQ::Protocol::Queue::DeleteOk) queue_delete(name, opts = {})

Deletes a queue using queue.delete AMQP 0.9.1 method

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
# File 'lib/bunny/channel.rb', line 1004

def queue_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id,
      name,
      opts[:if_unused],
      opts[:if_empty],
      false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_delete_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_delete_ok
end

- (AMQ::Protocol::Queue::PurgeOk) queue_purge(name, opts = {})

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.



1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
# File 'lib/bunny/channel.rb', line 1027

def queue_purge(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_purge_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_purge_ok
end

- (AMQ::Protocol::Queue::UnbindOk) queue_unbind(name, exchange, opts = {})

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

See Also:



1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
# File 'lib/bunny/channel.rb', line 1089

def queue_unbind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id,
      name,
      exchange_name,
      opts[:routing_key],
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_unbind_ok
end

- (Object) recover(ignored = true)

Tells RabbitMQ to redeliver unacknowledged messages



441
442
443
444
# File 'lib/bunny/channel.rb', line 441

def recover(ignored = true)
  # RabbitMQ only supports basic.recover with requeue = true
  basic_recover(true)
end

- (Object) recover_cancelled_consumers!



1539
1540
1541
# File 'lib/bunny/channel.rb', line 1539

def recover_cancelled_consumers!
  @recover_cancelled_consumers = true
end

- (Object) recover_confirm_mode

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.



1483
1484
1485
1486
1487
1488
# File 'lib/bunny/channel.rb', line 1483

def recover_confirm_mode
  if using_publisher_confirmations?
    @delivery_tag_offset = @next_publish_seq_no - 1
    confirm_select(@confirms_callback)
  end
end

- (Object) recover_consumers

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



1523
1524
1525
1526
1527
1528
1529
1530
1531
# File 'lib/bunny/channel.rb', line 1523

def recover_consumers
  unless @consumers.empty?
    @work_pool = ConsumerWorkPool.new(@work_pool.size)
    @work_pool.start
  end
  @consumers.values.dup.each do |c|
    c.recover_from_network_failure
  end
end

- (Object) recover_exchanges

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



1502
1503
1504
1505
1506
# File 'lib/bunny/channel.rb', line 1502

def recover_exchanges
  @exchanges.values.dup.each do |x|
    x.recover_from_network_failure
  end
end

- (Object) recover_from_network_failure

Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure Recovery feature.



1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
# File 'lib/bunny/channel.rb', line 1457

def recover_from_network_failure
  @logger.debug "Recovering channel #{@id} after network failure"
  release_all_continuations

  recover_prefetch_setting
  recover_confirm_mode
  recover_tx_mode
  recover_exchanges
  # this includes recovering bindings
  recover_queues
  recover_consumers
  increment_recoveries_counter
end

- (Object) recover_prefetch_setting

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



1475
1476
1477
# File 'lib/bunny/channel.rb', line 1475

def recover_prefetch_setting
  basic_qos(@prefetch_count) if @prefetch_count
end

- (Object) recover_queues

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



1512
1513
1514
1515
1516
1517
# File 'lib/bunny/channel.rb', line 1512

def recover_queues
  @queues.values.dup.each do |q|
    @logger.debug "Recovering queue #{q.name}"
    q.recover_from_network_failure
  end
end

- (Object) recover_tx_mode

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.



1494
1495
1496
# File 'lib/bunny/channel.rb', line 1494

def recover_tx_mode
  tx_select if @tx_mode
end

- (Boolean) recovers_cancelled_consumers?



1544
1545
1546
# File 'lib/bunny/channel.rb', line 1544

def recovers_cancelled_consumers?
  !!@recover_cancelled_consumers
end

- (Object) reject(delivery_tag, requeue = false)

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.



461
462
463
464
465
# File 'lib/bunny/channel.rb', line 461

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

- (Object) synchronize(&block)

Synchronizes given block using this channel's mutex.



1412
1413
1414
# File 'lib/bunny/channel.rb', line 1412

def synchronize(&block)
  @publishing_mutex.synchronize(&block)
end

- (Bunny::Queue) temporary_queue(opts = {})

Declares a new server-named queue that is automatically deleted when the connection is closed.

See Also:



410
411
412
# File 'lib/bunny/channel.rb', line 410

def temporary_queue(opts = {})
  queue("", opts.merge(:exclusive => true))
end

- (String) to_s



1552
1553
1554
# File 'lib/bunny/channel.rb', line 1552

def to_s
  "#<#{self.class.name}:#{object_id} @id=#{self.number} @connection=#{@connection.to_s}>"
end

- (Bunny::Exchange) topic(name, opts = {})

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:



332
333
334
# File 'lib/bunny/channel.rb', line 332

def topic(name, opts = {})
  Exchange.new(self, :topic, name, opts)
end

- (AMQ::Protocol::Tx::CommitOk) tx_commit

Commits current transaction



1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
# File 'lib/bunny/channel.rb', line 1316

def tx_commit
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_commit_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_commit_ok
end

- (AMQ::Protocol::Tx::RollbackOk) tx_rollback

Rolls back current transaction



1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
# File 'lib/bunny/channel.rb', line 1331

def tx_rollback
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_rollback_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_rollback_ok
end

- (AMQ::Protocol::Tx::SelectOk) tx_select

Puts the channel into transaction mode (starts a transaction)



1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
# File 'lib/bunny/channel.rb', line 1300

def tx_select
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @tx_mode = true

  @last_tx_select_ok
end

- (Boolean) using_publisher_confirmations? Also known as: using_publisher_confirms?

Returns true if this channel has Publisher Confirms enabled, false otherwise



1356
1357
1358
# File 'lib/bunny/channel.rb', line 1356

def using_publisher_confirmations?
  @next_publish_seq_no > 0
end

- (Boolean) using_tx?



1344
1345
1346
# File 'lib/bunny/channel.rb', line 1344

def using_tx?
  !!@tx_mode
end

- (Boolean) wait_for_confirms

Blocks calling thread until confirms are received for all currently unacknowledged published messages. Returns immediately if there are no outstanding confirms.



1400
1401
1402
1403
# File 'lib/bunny/channel.rb', line 1400

def wait_for_confirms
  wait_on_confirms_continuations
  read_and_reset_only_acks_received
end