Class: Bunny::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/channel.rb

Overview

Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening Channels

Channels can be opened either via Bunny::Session#create_channel (sufficient in the majority of cases) or by instantiating Bunny::Channel directly:

conn = Bunny.new
conn.start

ch   = conn.create_channel

This will automatically allocate a channel id.

Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

ch = conn.create_channel
ch.close

Higher-level API

Bunny offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

Exchange Operations In Higher-level API

Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

Channel IDs

Channels are identified by their ids which are integers. Bunny takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is not a problem.

Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With Bunny, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying channel.close method information.

Examples:

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue Bunny::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "bunny.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue Bunny::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Constant Summary

DEFAULT_CONTENT_TYPE =
"application/octet-stream".freeze
SHORTSTR_LIMIT =
255
MAX_PREFETCH_COUNT =

prefetch_count is of type short in the protocol. MK.

(2 ** 16) - 1

Instance Attribute Summary collapse

Backwards compatibility with 0.8.0 collapse

Higher-level API for exchange operations collapse

Higher-level API for queue operations collapse

QoS and Flow Control collapse

Message acknowledgements collapse

Consumer and Message operations (basic.*) collapse

Queue operations (queue.*) collapse

Exchange operations (exchange.*) collapse

Flow control (channel.*) collapse

Transactions (tx.*) collapse

Publisher Confirms (confirm.*) collapse

Misc collapse

Network Failure Recovery collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) ⇒ Channel

Returns a new instance of Channel

Parameters:

  • connection (Bunny::Session) (defaults to: nil)

    AMQP 0.9.1 connection

  • id (Integer) (defaults to: nil)

    Channel id, pass nil to make Bunny automatically allocate it

  • work_pool (Bunny::ConsumerWorkPool) (defaults to: ConsumerWorkPool.new(1))

    Thread pool for delivery processing, by default of size 1



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/bunny/channel.rb', line 168

def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
  @connection = connection
  @logger     = connection.logger
  @id         = id || @connection.next_channel_id
  @status     = :opening

  @connection.register_channel(self)

  @queues     = Hash.new
  @exchanges  = Hash.new
  @consumers  = Hash.new
  @work_pool  = work_pool

  # synchronizes frameset delivery. MK.
  @publishing_mutex = @connection.mutex_impl.new
  @consumer_mutex   = @connection.mutex_impl.new

  @queue_mutex    = @connection.mutex_impl.new
  @exchange_mutex = @connection.mutex_impl.new

  @unconfirmed_set_mutex = @connection.mutex_impl.new

  self.reset_continuations

  # threads awaiting on continuations. Used to unblock
  # them when network connection goes down so that busy loops
  # that perform synchronous operations can work. MK.
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @next_publish_seq_no = 0
  @delivery_tag_offset = 0

  @recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0)
  @uncaught_exception_handler = Proc.new do |e, consumer|
    @logger.error "Uncaught exception from consumer #{consumer.to_s}: #{e.inspect} @ #{e.backtrace[0]}"
  end
end

Instance Attribute Details

#connectionBunny::Session (readonly)

Returns AMQP connection this channel was opened on

Returns:



139
140
141
# File 'lib/bunny/channel.rb', line 139

def connection
  @connection
end

#consumersHash<String, Bunny::Consumer> (readonly)

Returns Consumer instances declared on this channel

Returns:

  • (Hash<String, Bunny::Consumer>)

    Consumer instances declared on this channel



155
156
157
# File 'lib/bunny/channel.rb', line 155

def consumers
  @consumers
end

#exchangesHash<String, Bunny::Exchange> (readonly)

Returns Exchange instances declared on this channel

Returns:

  • (Hash<String, Bunny::Exchange>)

    Exchange instances declared on this channel



149
150
151
# File 'lib/bunny/channel.rb', line 149

def exchanges
  @exchanges
end

#idInteger

Returns Channel id

Returns:

  • (Integer)

    Channel id



137
138
139
# File 'lib/bunny/channel.rb', line 137

def id
  @id
end

#nacked_setSet<Integer> (readonly)

Returns Set of nacked message indexes that have been nacked

Returns:

  • (Set<Integer>)

    Set of nacked message indexes that have been nacked



153
154
155
# File 'lib/bunny/channel.rb', line 153

def nacked_set
  @nacked_set
end

#next_publish_seq_noInteger (readonly)

Returns Next publisher confirmations sequence index

Returns:

  • (Integer)

    Next publisher confirmations sequence index



145
146
147
# File 'lib/bunny/channel.rb', line 145

def next_publish_seq_no
  @next_publish_seq_no
end

#prefetch_countInteger (readonly)

Returns active basic.qos prefetch value

Returns:

  • (Integer)

    active basic.qos prefetch value



158
159
160
# File 'lib/bunny/channel.rb', line 158

def prefetch_count
  @prefetch_count
end

#prefetch_globalInteger (readonly)

Returns active basic.qos prefetch global mode

Returns:

  • (Integer)

    active basic.qos prefetch global mode



160
161
162
# File 'lib/bunny/channel.rb', line 160

def prefetch_global
  @prefetch_global
end

#queuesHash<String, Bunny::Queue> (readonly)

Returns Queue instances declared on this channel

Returns:

  • (Hash<String, Bunny::Queue>)

    Queue instances declared on this channel



147
148
149
# File 'lib/bunny/channel.rb', line 147

def queues
  @queues
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter



208
209
210
# File 'lib/bunny/channel.rb', line 208

def recoveries_counter
  @recoveries_counter
end

#statusSymbol (readonly)

Returns Channel status (:opening, :open, :closed)

Returns:

  • (Symbol)

    Channel status (:opening, :open, :closed)



141
142
143
# File 'lib/bunny/channel.rb', line 141

def status
  @status
end

#unconfirmed_setSet<Integer> (readonly)

Returns Set of published message indexes that are currently unconfirmed

Returns:

  • (Set<Integer>)

    Set of published message indexes that are currently unconfirmed



151
152
153
# File 'lib/bunny/channel.rb', line 151

def unconfirmed_set
  @unconfirmed_set
end

#work_poolBunny::ConsumerWorkPool (readonly)

Returns Thread pool delivered messages are dispatched to.

Returns:

  • (Bunny::ConsumerWorkPool)

    Thread pool delivered messages are dispatched to.



143
144
145
# File 'lib/bunny/channel.rb', line 143

def work_pool
  @work_pool
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to acknowledge

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be acknowledged as well?

See Also:



475
476
477
# File 'lib/bunny/channel.rb', line 475

def ack(delivery_tag, multiple = false)
  basic_ack(delivery_tag.to_i, multiple)
end

#activeBoolean

Returns true if this channel is open

Returns:

  • (Boolean)

    true if this channel is open



273
274
275
# File 'lib/bunny/channel.rb', line 273

def active
  open?
end

#any_consumers?Boolean

Returns true if there are consumers on this channel

Returns:

  • (Boolean)

    true if there are consumers on this channel



972
973
974
# File 'lib/bunny/channel.rb', line 972

def any_consumers?
  @consumer_mutex.synchronize { @consumers.any? }
end

#basic_ack(delivery_tag, multiple = false) ⇒ NilClass

Acknowledges a delivery (message).

Examples:

Ack a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_ack(delivery_info.delivery_tag.to_i)
end

Ack a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_ack(delivery_info.delivery_tag.to_i)

Ack multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# ack all fetched messages up to payload3
ch.basic_ack(delivery_info.delivery_tag.to_i, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be acknowledged?

Returns:

  • (NilClass)

    nil

See Also:



761
762
763
764
765
766
767
768
# File 'lib/bunny/channel.rb', line 761

def basic_ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))

    nil
  end
end

#basic_cancel(consumer_tag) ⇒ AMQ::Protocol::Basic::CancelOk

Removes a consumer. Messages for this consumer will no longer be delivered. If the queue it was on is auto-deleted and this consumer was the last one, the queue will be deleted.

Parameters:

  • consumer_tag (String)

    Consumer tag (unique identifier) to cancel

Returns:

  • (AMQ::Protocol::Basic::CancelOk)

    RabbitMQ response

See Also:



956
957
958
959
960
961
962
963
964
965
966
967
968
# File 'lib/bunny/channel.rb', line 956

def basic_cancel(consumer_tag)
  @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_cancel_ok = wait_on_continuations
  end

  # reduces thread usage for channels that don't have any
  # consumers
  @work_pool.shutdown(true) unless self.any_consumers?

  @last_basic_cancel_ok
end

#basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block) ⇒ AMQ::Protocol::Basic::ConsumeOk Also known as: consume

Registers a consumer for queue. Delivered messages will be handled with the block provided to this method.

Parameters:

  • queue (String, Bunny::Queue)

    Queue to consume from

  • consumer_tag (String) (defaults to: generate_consumer_tag)

    Consumer tag (unique identifier), generated by Bunny by default

  • no_ack (Boolean) (defaults to: false)

    (false) If true, delivered messages will be automatically acknowledged. If false, manual acknowledgements will be necessary.

  • exclusive (Boolean) (defaults to: false)

    (false) Should this consumer be exclusive?

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments that may be used by RabbitMQ extensions, etc

Returns:

  • (AMQ::Protocol::Basic::ConsumeOk)

    RabbitMQ response

See Also:



848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
# File 'lib/bunny/channel.rb', line 848

def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  queue_name = if queue.respond_to?(:name)
                 queue.name
               else
                 queue
               end

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      queue_name,
      consumer_tag,
      false,
      no_ack,
      exclusive,
      false,
      arguments))

  begin
    Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)

  @last_basic_consume_ok
end

#basic_consume_with(consumer) ⇒ AMQ::Protocol::Basic::ConsumeOk Also known as: consume_with

Registers a consumer for queue as Bunny::Consumer instance.

Parameters:

  • consumer (Bunny::Consumer)

    Consumer to register. It should already have queue name, consumer tag and other attributes set.

Returns:

  • (AMQ::Protocol::Basic::ConsumeOk)

    RabbitMQ response

See Also:



904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
# File 'lib/bunny/channel.rb', line 904

def basic_consume_with(consumer)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    register_consumer(consumer.consumer_tag, consumer)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      consumer.queue_name,
      consumer.consumer_tag,
      false,
      consumer.no_ack,
      consumer.exclusive,
      false,
      consumer.arguments))

  begin
    Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  register_consumer(@last_basic_consume_ok.consumer_tag, consumer)

  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_consume_ok
end

#basic_get(queue, opts = {:manual_ack => true}) ⇒ Array

Synchronously fetches a message from the queue, if there are any. This method is for cases when the convenience of synchronous operations is more important than throughput.

Examples:

Using Bunny::Channel#basic_get with manual acknowledgements

conn = Bunny.new
conn.start
ch   = conn.create_channel
# here we assume the queue already exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :manual_ack => true)
ch.acknowledge(delivery_info.delivery_tag)

Parameters:

  • queue (String)

    Queue name

  • opts (Hash) (defaults to: {:manual_ack => true})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: true

    [DEPRECATED] Use :manual_ack instead

  • :manual_ack (Boolean) — default: true

    Will this message be acknowledged manually?

Returns:

  • (Array)

    A triple of delivery info, message properties and message content

See Also:



588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
# File 'lib/bunny/channel.rb', line 588

def basic_get(queue, opts = {:manual_ack => true})
  raise_if_no_longer_open!

  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:manual_ack])))
  # this is a workaround for the edge case when basic_get is called in a tight loop
  # and network goes down we need to perform recovery. The problem is, basic_get will
  # keep blocking the thread that calls it without clear way to constantly unblock it
  # from the network activity loop (where recovery happens) with the current continuations
  # implementation (and even more correct and convenient ones, such as wait/notify, should
  # we implement them). So we return a triple of nils immediately which apps should be
  # able to handle anyway as "got no message, no need to act". MK.
  last_basic_get_response = if @connection.open?
                              begin
                                wait_on_basic_get_continuations
                              rescue Timeout::Error => e
                                raise_if_continuation_resulted_in_a_channel_error!
                                raise e
                              end
                            else
                              [nil, nil, nil]
                            end

  raise_if_continuation_resulted_in_a_channel_error!
  last_basic_get_response
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag, false, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_nack(delivery_info.delivery_tag, false, true)

Requeue multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# requeue all fetched messages up to payload3
ch.basic_nack(delivery_info.delivery_tag, true, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be rejected/requeued?

Returns:

  • (NilClass)

    nil

See Also:



823
824
825
826
827
828
829
830
831
832
833
# File 'lib/bunny/channel.rb', line 823

def basic_nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
                                                             delivery_tag,
                                                             multiple,
                                                             requeue))

    nil
  end
end

#basic_publish(payload, exchange, routing_key, opts = {}) ⇒ Bunny::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Parameters:

  • payload (String)

    Message payload. It will never be modified by Bunny or RabbitMQ in any way.

  • exchange (String)

    Exchange to publish to

  • routing_key (String)

    Routing key

  • opts (Hash) (defaults to: {})

    Publishing options

Options Hash (opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

Raises:

  • (ArgumentError)


526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# File 'lib/bunny/channel.rb', line 526

def basic_publish(payload, exchange, routing_key, opts = {})
  raise_if_no_longer_open!
  raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  mode = if opts.fetch(:persistent, true)
           2
         else
           1
         end

  opts[:delivery_mode] ||= mode
  opts[:content_type]  ||= DEFAULT_CONTENT_TYPE
  opts[:priority]      ||= 0

  if @next_publish_seq_no > 0
    @unconfirmed_set_mutex.synchronize do
      @unconfirmed_set.add(@next_publish_seq_no)
      @next_publish_seq_no += 1
    end
  end

  frames = AMQ::Protocol::Basic::Publish.encode(@id,
    payload,
    opts,
    exchange_name,
    routing_key,
    opts[:mandatory],
    false,
    @connection.frame_max)
  @connection.send_frameset(frames, self)

  self
end

#basic_qos(count, global = false) ⇒ AMQ::Protocol::Basic::QosOk Also known as: prefetch

Controls message delivery rate using basic.qos AMQP 0.9.1 method.

Parameters:

  • prefetch_count (Integer)

    How many messages can consumers on this channel be given at a time (before they have to acknowledge or reject one of the earlier received messages)

  • global (Boolean) (defaults to: false)

    Whether to use global mode for prefetch:

    • +false+: per-consumer
    • +true+: per-channel Note that the default value (+false+) hasn't actually changed, but previous documentation described that as meaning per-channel and unsupported in RabbitMQ, whereas it now actually appears to mean per-consumer and supported (https://www.rabbitmq.com/consumer-prefetch.html).

Returns:

  • (AMQ::Protocol::Basic::QosOk)

    RabbitMQ response

Raises:

  • (ArgumentError)

See Also:



639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
# File 'lib/bunny/channel.rb', line 639

def basic_qos(count, global = false)
  raise ArgumentError.new("prefetch count must be a positive integer, given: #{count}") if count < 0
  raise ArgumentError.new("prefetch count must be no greater than #{MAX_PREFETCH_COUNT}, given: #{count}") if count > MAX_PREFETCH_COUNT
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, count, global))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_qos_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @prefetch_count  = count
  @prefetch_global = global

  @last_basic_qos_ok
end

#basic_recover(requeue) ⇒ AMQ::Protocol::Basic::RecoverOk

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean)

    Should messages be requeued?

Returns:

  • (AMQ::Protocol::Basic::RecoverOk)

    RabbitMQ response



663
664
665
666
667
668
669
670
671
672
673
# File 'lib/bunny/channel.rb', line 663

def basic_recover(requeue)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_basic_recover_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_recover_ok
end

#basic_reject(delivery_tag, requeue = false) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # reject the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

Returns:

  • (NilClass)

    nil

See Also:



713
714
715
716
717
718
719
720
# File 'lib/bunny/channel.rb', line 713

def basic_reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))

    nil
  end
end

#channel_flow(active) ⇒ AMQ::Protocol::Channel::FlowOk

Note:

Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects that consumers do not keep up with them.

Enables or disables message flow for the channel. When message flow is disabled, no new messages will be delivered to consumers on this channel. This is typically used by consumers that cannot keep up with the influx of messages.

Returns:

  • (AMQ::Protocol::Channel::FlowOk)

    RabbitMQ response

See Also:



1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
# File 'lib/bunny/channel.rb', line 1303

def channel_flow(active)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_channel_flow_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_channel_flow_ok
end

#clientBunny::Session

Returns Connection this channel was opened on

Returns:



278
279
280
# File 'lib/bunny/channel.rb', line 278

def client
  @connection
end

#closeObject

Closes the channel. Closed channels can no longer be used (this includes associated Queue, Exchange and Bunny::Consumer instances.



235
236
237
238
239
240
# File 'lib/bunny/channel.rb', line 235

def close
  @connection.close_channel(self)
  @status = :closed
  @work_pool.shutdown
  maybe_kill_consumer_work_pool!
end

#closed?Boolean

Returns true if this channel is closed (manually or because of an exception), false otherwise

Returns:

  • (Boolean)

    true if this channel is closed (manually or because of an exception), false otherwise



250
251
252
# File 'lib/bunny/channel.rb', line 250

def closed?
  @status == :closed
end

#confirm_select(callback = nil) ⇒ AMQ::Protocol::Confirm::SelectOk

Enables publisher confirms for the channel.

Returns:

  • (AMQ::Protocol::Confirm::SelectOk)

    RabbitMQ response

See Also:



1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
# File 'lib/bunny/channel.rb', line 1392

def confirm_select(callback = nil)
  raise_if_no_longer_open!

  if @next_publish_seq_no == 0
    @confirms_continuations = new_continuation
    @unconfirmed_set        = Set.new
    @nacked_set             = Set.new
    @next_publish_seq_no    = 1
    @only_acks_received = true
  end

  @confirms_callback = callback

  @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_confirm_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @last_confirm_select_ok
end

#default_exchangeObject

Provides access to the default exchange



371
372
373
# File 'lib/bunny/channel.rb', line 371

def default_exchange
  Exchange.default(self)
end

#direct(name, opts = {}) ⇒ Bunny::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



328
329
330
# File 'lib/bunny/channel.rb', line 328

def direct(name, opts = {})
  find_exchange(name) || Exchange.new(self, :direct, name, opts)
end

#exchange(name, opts = {}) ⇒ Bunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or "x-consistent-hash"

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



389
390
391
# File 'lib/bunny/channel.rb', line 389

def exchange(name, opts = {})
  Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end

#exchange_bind(source, destination, opts = {}) ⇒ AMQ::Protocol::Exchange::BindOk

Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Exchange::BindOk)

    RabbitMQ response

See Also:



1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
# File 'lib/bunny/channel.rb', line 1214

def exchange_bind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_bind_ok
end

#exchange_declare(name, type, opts = {}) ⇒ AMQ::Protocol::Exchange::DeclareOk

Declares a echange using echange.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Exchange name

  • type (String, Symbol)

    Exchange type, e.g. :fanout or :topic

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this echange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) — default: false

    Should this echange be deleted when it is no longer used?

  • passive (Boolean) — default: false

    If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • (AMQ::Protocol::Exchange::DeclareOk)

    RabbitMQ response

See Also:



1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
# File 'lib/bunny/channel.rb', line 1154

def exchange_declare(name, type, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id,
      name,
      type.to_s,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:auto_delete, false),
      opts.fetch(:internal, false),
      false, # nowait
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_declare_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_declare_ok
end

#exchange_delete(name, opts = {}) ⇒ AMQ::Protocol::Exchange::DeleteOk

Deletes a exchange using exchange.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

Returns:

  • (AMQ::Protocol::Exchange::DeleteOk)

    RabbitMQ response

See Also:



1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
# File 'lib/bunny/channel.rb', line 1184

def exchange_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id,
      name,
      opts[:if_unused],
      false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_delete_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_delete_ok
end

#exchange_unbind(source, destination, opts = {}) ⇒ AMQ::Protocol::Exchange::UnbindOk

Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Exchange::UnbindOk)

    RabbitMQ response

See Also:



1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
# File 'lib/bunny/channel.rb', line 1258

def exchange_unbind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_exchange_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_unbind_ok
end

#fanout(name, opts = {}) ⇒ Bunny::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



310
311
312
# File 'lib/bunny/channel.rb', line 310

def fanout(name, opts = {})
  find_exchange(name) || Exchange.new(self, :fanout, name, opts)
end

#flow(active) ⇒ Object

Flow control. When set to false, RabbitMQ will stop delivering messages on this channel.

Parameters:

  • active (Boolean)

    Should messages to consumers on this channel be delivered?



438
439
440
# File 'lib/bunny/channel.rb', line 438

def flow(active)
  channel_flow(active)
end

#generate_consumer_tag(name = "bunny") ⇒ String

Unique string supposed to be used as a consumer tag.

Returns:

  • (String)

    Unique string.



1443
1444
1445
# File 'lib/bunny/channel.rb', line 1443

def generate_consumer_tag(name = "bunny")
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

#headers(name, opts = {}) ⇒ Bunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



364
365
366
# File 'lib/bunny/channel.rb', line 364

def headers(name, opts = {})
  find_exchange(name) || Exchange.new(self, :headers, name, opts)
end

#inspectObject



259
260
261
# File 'lib/bunny/channel.rb', line 259

def inspect
  to_s
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be rejected as well?

  • requeue (Boolean) (defaults to: false)

    (false) Should this message be requeued instead of dropping it?

See Also:



490
491
492
# File 'lib/bunny/channel.rb', line 490

def nack(delivery_tag, multiple = false, requeue = false)
  basic_nack(delivery_tag.to_i, multiple, requeue)
end

#numberInteger

Returns Channel id

Returns:

  • (Integer)

    Channel id



268
269
270
# File 'lib/bunny/channel.rb', line 268

def number
  self.id
end

#on_error(&block) ⇒ Object

Defines a handler for errors that are not responses to a particular operations (e.g. basic.ack, basic.reject, basic.nack).



1458
1459
1460
# File 'lib/bunny/channel.rb', line 1458

def on_error(&block)
  @on_error = block
end

#on_uncaught_exception(&block) ⇒ Object

Defines a handler for uncaught exceptions in consumers (e.g. delivered message handlers).



1466
1467
1468
# File 'lib/bunny/channel.rb', line 1466

def on_uncaught_exception(&block)
  @uncaught_exception_handler = block
end

#openBunny::Channel

Opens the channel and resets its internal state

Returns:



218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/bunny/channel.rb', line 218

def open
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @connection.open_channel(self)
  # clear last channel error
  @last_channel_error = nil

  @status = :open

  self
end

#open?Boolean

Returns true if this channel is open, false otherwise

Returns:

  • (Boolean)

    true if this channel is open, false otherwise



244
245
246
# File 'lib/bunny/channel.rb', line 244

def open?
  @status == :open
end

#queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Bunny::Queue

Declares a queue or looks it up in the per-channel cache.

Parameters:

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).

  • opts (Hash) (defaults to: {})

    Queue properties and other options

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

  • (Bunny::Queue)

    Queue that was declared or looked up in the cache

See Also:



412
413
414
415
416
# File 'lib/bunny/channel.rb', line 412

def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
  q = find_queue(name) || Bunny::Queue.new(self, name, opts)

  register_queue(q)
end

#queue_bind(name, exchange, opts = {}) ⇒ AMQ::Protocol::Queue::BindOk

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Queue::BindOk)

    RabbitMQ response

See Also:



1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
# File 'lib/bunny/channel.rb', line 1076

def queue_bind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id,
      name,
      exchange_name,
      (opts[:routing_key] || opts[:key]),
      false,
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_bind_ok
end

#queue_declare(name, opts = {}) ⇒ AMQ::Protocol::Queue::DeclareOk

Declares a queue using queue.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean) — default: false

    Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean) — default: false

    Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean) — default: false

    If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • (AMQ::Protocol::Queue::DeclareOk)

    RabbitMQ response

See Also:



998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
# File 'lib/bunny/channel.rb', line 998

def queue_declare(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id,
      name,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:exclusive, false),
      opts.fetch(:auto_delete, false),
      false,
      opts[:arguments]))
  @last_queue_declare_ok = wait_on_continuations

  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_declare_ok
end

#queue_delete(name, opts = {}) ⇒ AMQ::Protocol::Queue::DeleteOk

Deletes a queue using queue.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

Returns:

  • (AMQ::Protocol::Queue::DeleteOk)

    RabbitMQ response

See Also:



1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
# File 'lib/bunny/channel.rb', line 1027

def queue_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id,
      name,
      opts[:if_unused],
      opts[:if_empty],
      false))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_delete_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_delete_ok
end

#queue_purge(name, opts = {}) ⇒ AMQ::Protocol::Queue::PurgeOk

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

Returns:

  • (AMQ::Protocol::Queue::PurgeOk)

    RabbitMQ response

See Also:



1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
# File 'lib/bunny/channel.rb', line 1050

def queue_purge(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_purge_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_purge_ok
end

#queue_unbind(name, exchange, opts = {}) ⇒ AMQ::Protocol::Queue::UnbindOk

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Queue::UnbindOk)

    RabbitMQ response

See Also:



1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
# File 'lib/bunny/channel.rb', line 1112

def queue_unbind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id,
      name,
      exchange_name,
      opts[:routing_key],
      opts[:arguments]))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_queue_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_unbind_ok
end

#recover(ignored = true) ⇒ Object

Tells RabbitMQ to redeliver unacknowledged messages



444
445
446
447
# File 'lib/bunny/channel.rb', line 444

def recover(ignored = true)
  # RabbitMQ only supports basic.recover with requeue = true
  basic_recover(true)
end

#recover_cancelled_consumers!Object



1564
1565
1566
# File 'lib/bunny/channel.rb', line 1564

def recover_cancelled_consumers!
  @recover_cancelled_consumers = true
end

#recover_confirm_modeObject

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.



1506
1507
1508
1509
1510
1511
1512
# File 'lib/bunny/channel.rb', line 1506

def recover_confirm_mode
  if using_publisher_confirmations?
    @unconfirmed_set.clear
    @delivery_tag_offset = @next_publish_seq_no - 1
    confirm_select(@confirms_callback)
  end
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
# File 'lib/bunny/channel.rb', line 1547

def recover_consumers
  unless @consumers.empty?
    @work_pool = ConsumerWorkPool.new(@work_pool.size, @work_pool.abort_on_exception)
    @work_pool.start
  end

  @consumer_mutex.synchronize { @consumers.values }.each do |c|
    c.recover_from_network_failure
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



1526
1527
1528
1529
1530
# File 'lib/bunny/channel.rb', line 1526

def recover_exchanges
  @exchange_mutex.synchronize { @exchanges.values }.each do |x|
    x.recover_from_network_failure
  end
end

#recover_from_network_failureObject

Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure Recovery feature.



1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
# File 'lib/bunny/channel.rb', line 1480

def recover_from_network_failure
  @logger.debug { "Recovering channel #{@id} after network failure" }
  release_all_continuations

  recover_prefetch_setting
  recover_confirm_mode
  recover_tx_mode
  recover_exchanges
  # this includes recovering bindings
  recover_queues
  recover_consumers
  increment_recoveries_counter
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



1498
1499
1500
# File 'lib/bunny/channel.rb', line 1498

def recover_prefetch_setting
  basic_qos(@prefetch_count, @prefetch_global) if @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



1536
1537
1538
1539
1540
1541
# File 'lib/bunny/channel.rb', line 1536

def recover_queues
  @queue_mutex.synchronize { @queues.values }.each do |q|
    @logger.debug { "Recovering queue #{q.name}" }
    q.recover_from_network_failure
  end
end

#recover_tx_modeObject

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.



1518
1519
1520
# File 'lib/bunny/channel.rb', line 1518

def recover_tx_mode
  tx_select if @tx_mode
end

#recovers_cancelled_consumers?Boolean

Returns:

  • (Boolean)


1569
1570
1571
# File 'lib/bunny/channel.rb', line 1569

def recovers_cancelled_consumers?
  !!@recover_cancelled_consumers
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • requeue (Boolean) (defaults to: false)

    Should this message be requeued instead of dropping it?

See Also:



464
465
466
# File 'lib/bunny/channel.rb', line 464

def reject(delivery_tag, requeue = false)
  basic_reject(delivery_tag.to_i, requeue)
end

#synchronize(&block) ⇒ Object

Synchronizes given block using this channel's mutex.



1435
1436
1437
# File 'lib/bunny/channel.rb', line 1435

def synchronize(&block)
  @publishing_mutex.synchronize(&block)
end

#temporary_queue(opts = {}) ⇒ Bunny::Queue

Declares a new server-named queue that is automatically deleted when the connection is closed.

Returns:

See Also:



424
425
426
# File 'lib/bunny/channel.rb', line 424

def temporary_queue(opts = {})
  queue("", opts.merge(:exclusive => true))
end

#to_sString

Returns Brief human-readable representation of the channel

Returns:

  • (String)

    Brief human-readable representation of the channel



1577
1578
1579
1580
# File 'lib/bunny/channel.rb', line 1577

def to_s
  oid = ("0x%x" % (self.object_id << 1))
  "<#{self.class.name}:#{oid} number=#{@channel.id} @open=#{open?} connection=#{@connection.to_s}>"
end

#topic(name, opts = {}) ⇒ Bunny::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



346
347
348
# File 'lib/bunny/channel.rb', line 346

def topic(name, opts = {})
  find_exchange(name) || Exchange.new(self, :topic, name, opts)
end

#tx_commitAMQ::Protocol::Tx::CommitOk

Commits current transaction

Returns:

  • (AMQ::Protocol::Tx::CommitOk)

    RabbitMQ response



1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
# File 'lib/bunny/channel.rb', line 1340

def tx_commit
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_commit_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_commit_ok
end

#tx_rollbackAMQ::Protocol::Tx::RollbackOk

Rolls back current transaction

Returns:

  • (AMQ::Protocol::Tx::RollbackOk)

    RabbitMQ response



1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
# File 'lib/bunny/channel.rb', line 1355

def tx_rollback
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_rollback_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_rollback_ok
end

#tx_selectAMQ::Protocol::Tx::SelectOk

Puts the channel into transaction mode (starts a transaction)

Returns:

  • (AMQ::Protocol::Tx::SelectOk)

    RabbitMQ response



1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
# File 'lib/bunny/channel.rb', line 1324

def tx_select
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout) do
    @last_tx_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @tx_mode = true

  @last_tx_select_ok
end

#using_publisher_confirmations?Boolean Also known as: using_publisher_confirms?

Returns true if this channel has Publisher Confirms enabled, false otherwise

Returns:

  • (Boolean)

    true if this channel has Publisher Confirms enabled, false otherwise



1380
1381
1382
# File 'lib/bunny/channel.rb', line 1380

def using_publisher_confirmations?
  @next_publish_seq_no > 0
end

#using_tx?Boolean

Returns true if this channel has transactions enabled

Returns:

  • (Boolean)

    true if this channel has transactions enabled



1368
1369
1370
# File 'lib/bunny/channel.rb', line 1368

def using_tx?
  !!@tx_mode
end

#wait_for_confirmsBoolean

Blocks calling thread until confirms are received for all currently unacknowledged published messages. Returns immediately if there are no outstanding confirms.

Returns:

  • (Boolean)

    true if all messages were acknowledged positively since the last time this method was called, false otherwise

See Also:



1423
1424
1425
1426
# File 'lib/bunny/channel.rb', line 1423

def wait_for_confirms
  wait_on_confirms_continuations
  read_and_reset_only_acks_received
end