Class: MarchHare::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/channel.rb

Overview

## Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

## Opening Channels

Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:

This will automatically allocate a channel id.

## Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

## Higher-level API

MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

### Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

### Exchange Operations In Higher-level API

## Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

## Channel IDs

Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.

## Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.

Examples:

conn = MarchHare.new
conn.start

ch   = conn.create_channel

ch  = conn.create_channel
ch.close

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue MarchHare::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "rubymarchhare.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue MarchHare::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Defined Under Namespace

Classes: BlockConfirmListener, BlockReturnListener

Instance Attribute Summary collapse

Exchanges collapse

Queues collapse

basic.* collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, delegate) ⇒ Channel

Returns a new instance of Channel.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/march_hare/channel.rb', line 121

def initialize(session, delegate)
  @connection = session
  @delegate   = delegate

  @exchanges      = JavaConcurrent::ConcurrentHashMap.new
  @queues         = JavaConcurrent::ConcurrentHashMap.new
  # we keep track of consumers in part to gracefully shut down their
  # executors when the channel is closed. This frees library users
  # from having to worry about this. MK.
  @consumers      = JavaConcurrent::ConcurrentHashMap.new
  @shutdown_hooks = Array.new
  @confirm_hooks  = Array.new
  @recoveries_counter = JavaConcurrent::AtomicInteger.new(0)

  on_shutdown do |ch, cause|
    ch.gracefully_shut_down_consumers
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



887
888
889
# File 'lib/march_hare/channel.rb', line 887

def method_missing(selector, *args)
  @delegate.__send__(selector, *args)
end

Instance Attribute Details

#consumersArray<MarchHare::Consumer> (readonly)

Returns Consumers on this channel.

Returns:

  • (Array<MarchHare::Consumer>)

    Consumers on this channel



118
119
120
# File 'lib/march_hare/channel.rb', line 118

def consumers
  @consumers
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter.



300
301
302
# File 'lib/march_hare/channel.rb', line 300

def recoveries_counter
  @recoveries_counter
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to acknowledge

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be acknowledged as well?

See Also:



675
676
677
678
679
# File 'lib/march_hare/channel.rb', line 675

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

#automatically_recover(session, java_connection) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/march_hare/channel.rb', line 194

def automatically_recover(session, java_connection)
  logger.debug("channel: begin automatic connection recovery")

  jch = java_connection.create_channel(id)

  self.revive_with(jch)
  self.recover_shutdown_hooks

  self.recover_prefetch_setting
  self.recover_confirm_mode
  self.recover_confirm_hooks
  self.recover_tx_mode
  self.recover_exchanges
  # # this includes bindings recovery
  self.recover_queues
  self.recover_consumers
  self.increment_recoveries_counter
end

#basic_ack(delivery_tag, multiple) ⇒ NilClass

Acknowledges one or more messages (deliveries).

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • multiple (Boolean)

    Should all deliveries up to this one be acknowledged?

Returns:

  • (NilClass)

    nil

See Also:



761
762
763
764
765
# File 'lib/march_hare/channel.rb', line 761

def basic_ack(delivery_tag, multiple)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_ack(delivery_tag.to_i, multiple)
  end
end

#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object



627
628
629
630
631
632
633
634
635
636
637
638
639
# File 'lib/march_hare/channel.rb', line 627

def basic_consume(queue, auto_ack, consumer_tag = nil, consumer)
  consumer.auto_ack = auto_ack
  tag = converting_rjc_exceptions_to_ruby do
    if consumer_tag
      @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer)
    else
      @delegate.basic_consume(queue, auto_ack, consumer)
    end
  end
  self.register_consumer(tag, consumer)

  tag
end

#basic_get(queue, auto_ack) ⇒ Object



621
622
623
624
625
# File 'lib/march_hare/channel.rb', line 621

def basic_get(queue, auto_ack)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_get(queue, auto_ack)
  end
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be rejected/requeued?

Returns:

  • (NilClass)

    nil

See Also:



777
778
779
780
781
# File 'lib/march_hare/channel.rb', line 777

def basic_nack(delivery_tag, multiple = false, requeue = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Parameters:

  • exchange (String)

    Exchange to publish to

  • routing_key (String)

    Routing key

  • body (String)

    Message payload. It will never be modified by MarchHare or RabbitMQ in any way.

  • properties (Hash)

    Message properties

  • opts (Hash)

    a customizable set of options

Options Hash (properties):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:



615
616
617
618
619
# File 'lib/march_hare/channel.rb', line 615

def basic_publish(exchange, routing_key, mandatory, properties, body)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body)
  end
end

#basic_qos(prefetch_count) ⇒ Object



641
642
643
644
645
646
647
648
# File 'lib/march_hare/channel.rb', line 641

def basic_qos(prefetch_count)
  r = converting_rjc_exceptions_to_ruby do
    @delegate.basic_qos(prefetch_count)
  end
  @prefetch_count = prefetch_count

  r
end

#basic_recover(requeue = true) ⇒ Object

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean) (defaults to: true)

    Should messages be requeued?

Returns:

  • RabbitMQ response



787
788
789
790
791
# File 'lib/march_hare/channel.rb', line 787

def basic_recover(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover(requeue)
  end
end

#basic_recover_async(requeue = true) ⇒ Object

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean) (defaults to: true)

    Should messages be requeued?

Returns:

  • RabbitMQ response



797
798
799
800
801
# File 'lib/march_hare/channel.rb', line 797

def basic_recover_async(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover_async(requeue)
  end
end

#basic_reject(delivery_tag, requeue) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean)

    Should the message be requeued?

Returns:

  • (NilClass)

    nil

See Also:



748
749
750
751
752
# File 'lib/march_hare/channel.rb', line 748

def basic_reject(delivery_tag, requeue)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_reject(delivery_tag.to_i, requeue)
  end
end

#channel_numberInteger Also known as: id, number

Returns Channel id.

Returns:

  • (Integer)

    Channel id



153
154
155
# File 'lib/march_hare/channel.rb', line 153

def channel_number
  @delegate.channel_number
end

#close(code = 200, reason = "Goodbye") ⇒ Object

Closes the channel.

Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.



169
170
171
172
173
174
175
176
177
178
179
# File 'lib/march_hare/channel.rb', line 169

def close(code = 200, reason = "Goodbye")
  v = @delegate.close(code, reason)

  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end

  @connection.unregister_channel(self)

  v
end

#confirm_selectNilClass

Enables publisher confirms on the channel.

Returns:

  • (NilClass)

    nil

See Also:



810
811
812
813
814
815
# File 'lib/march_hare/channel.rb', line 810

def confirm_select
  converting_rjc_exceptions_to_ruby do
    @confirm_mode = true
    @delegate.confirm_select
  end
end

#converting_rjc_exceptions_to_ruby(&block) ⇒ Object

Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.



992
993
994
995
996
997
998
# File 'lib/march_hare/channel.rb', line 992

def converting_rjc_exceptions_to_ruby(&block)
  begin
    block.call
  rescue Exception, java.lang.Throwable => e
    Exceptions.convert_and_reraise(e)
  end
end

#default_exchangeObject

Provides access to the default exchange



412
413
414
# File 'lib/march_hare/channel.rb', line 412

def default_exchange
  @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct")
end

#deregister_exchange(exchange) ⇒ Object



958
959
960
961
# File 'lib/march_hare/channel.rb', line 958

def deregister_exchange(exchange)
  logger.debug("channel: deregister exchange #{exchange.name}")
  @exchanges.delete(exchange.name)
end

#deregister_queue(queue) ⇒ Object



936
937
938
939
# File 'lib/march_hare/channel.rb', line 936

def deregister_queue(queue)
  logger.debug("channel: deregister queue #{queue.name}")
  @queues.delete(queue.name)
end

#deregister_queue_named(name) ⇒ Object



942
943
944
# File 'lib/march_hare/channel.rb', line 942

def deregister_queue_named(name)
  @queues.delete(name)
end

#direct(name, opts = {}) ⇒ MarchHare::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



360
361
362
363
364
365
366
# File 'lib/march_hare/channel.rb', line 360

def direct(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#exchange(name, options = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash)

    Exchange parameters

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or “x-consistent-hash”

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



318
319
320
321
322
323
324
# File 'lib/march_hare/channel.rb', line 318

def exchange(name, options={})
  dx = Exchange.new(self, name, options).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object

Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)

Parameters:

  • desitnation (String)

    Destination exchange name

  • source (String)

    Source exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments

Returns:

  • RabbitMQ response

See Also:



444
445
446
447
448
# File 'lib/march_hare/channel.rb', line 444

def exchange_bind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_bind(destination, source, routing_key, arguments)
  end
end

#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object

Declares a echange using echange.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Exchange name

  • durable (Boolean) (defaults to: false)

    (false) Should information about this echange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) (defaults to: false)

    (false) Should this echange be deleted when it is no longer used?

  • passive (Boolean)

    (false) If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • RabbitMQ response

See Also:



427
428
429
430
431
# File 'lib/march_hare/channel.rb', line 427

def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments)
  end
end

#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object

Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)

Parameters:

  • destination (String)

    Destination exchange name

  • source (String)

    Source exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    ({}) Optional arguments

Returns:

  • RabbitMQ response

See Also:



461
462
463
464
465
# File 'lib/march_hare/channel.rb', line 461

def exchange_unbind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_unbind(destination, source, routing_key, arguments)
  end
end

#fanout(name, opts = {}) ⇒ MarchHare::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



339
340
341
342
343
344
345
# File 'lib/march_hare/channel.rb', line 339

def fanout(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#find_queue(name) ⇒ Object



953
954
955
# File 'lib/march_hare/channel.rb', line 953

def find_queue(name)
  @queues[name]
end

#gracefully_shut_down_consumersObject



982
983
984
985
986
# File 'lib/march_hare/channel.rb', line 982

def gracefully_shut_down_consumers
  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end
end

#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object



1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
# File 'lib/march_hare/channel.rb', line 1001

def guarding_against_stale_delivery_tags(tag, &block)
  case tag
  # if a fixnum was passed, execute unconditionally. MK.
  when Integer then
    block.call
    # versioned delivery tags should be checked to avoid
    # sending out stale (invalid) tags after channel was reopened
    # during network failure recovery. MK.
  when VersionedDeliveryTag then
    if !tag.stale?(@recoveries_counter.get)
      block.call
    end
  end
end

#headers(name, opts = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



402
403
404
405
406
407
408
# File 'lib/march_hare/channel.rb', line 402

def headers(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#increment_recoveries_counterObject



296
297
298
# File 'lib/march_hare/channel.rb', line 296

def increment_recoveries_counter
  @recoveries_counter.increment_and_get
end

#logger::Logger

Returns Logger instance from the connection.

Returns:

  • (::Logger)

    Logger instance from the connection



148
149
150
# File 'lib/march_hare/channel.rb', line 148

def logger
  @connection.logger
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be rejected as well?

  • requeue (Boolean) (defaults to: false)

    (false) Should this message be requeued instead of dropping it?

See Also:



705
706
707
708
709
# File 'lib/march_hare/channel.rb', line 705

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#next_publisher_seq_noObject



841
842
843
# File 'lib/march_hare/channel.rb', line 841

def next_publisher_seq_no
  @delegate.next_publisher_seq_no
end

#on_confirm(&block) ⇒ Object

Defines a publisher confirm handler



881
882
883
884
885
# File 'lib/march_hare/channel.rb', line 881

def on_confirm(&block)
  ch = BlockConfirmListener.from(block)
  self.add_confirm_listener(ch)
  @confirm_hooks << ch
end

#on_return(&block) ⇒ Object

Defines a returned message handler.



875
876
877
# File 'lib/march_hare/channel.rb', line 875

def on_return(&block)
  self.add_return_listener(BlockReturnListener.from(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.



184
185
186
187
188
189
190
191
# File 'lib/march_hare/channel.rb', line 184

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)

  @shutdown_hooks << sh
  @delegate.add_shutdown_listener(sh)

  sh
end

#open?Boolean

Returns true if the channel is open.

Returns:

  • (Boolean)

    true if the channel is open



160
161
162
# File 'lib/march_hare/channel.rb', line 160

def open?
  @delegate.open?
end

#prefetchInteger

Returns Active basic.qos prefetch setting.

Returns:

  • (Integer)

    Active basic.qos prefetch setting.



665
666
667
# File 'lib/march_hare/channel.rb', line 665

def prefetch
  @prefetch_count || 0
end

#prefetch=(n) ⇒ Object

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages

Parameters:

  • prefetch_count (Integer)

    Prefetch (QoS setting) for this channel

See Also:



660
661
662
# File 'lib/march_hare/channel.rb', line 660

def prefetch=(n)
  basic_qos(n)
end

#qos(options = {}) ⇒ Object



650
651
652
# File 'lib/march_hare/channel.rb', line 650

def qos(options={})
  basic_qos(options.fetch(:prefetch_count, 0))
end

#queue(name, options = {}) ⇒ MarchHare::Queue

Declares a queue or looks it up in the per-channel cache.

Parameters:

  • name (String)

    Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).

  • options (Hash) (defaults to: {})

    Queue properties and other options

Options Hash (options):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

See Also:



485
486
487
488
489
490
491
# File 'lib/march_hare/channel.rb', line 485

def queue(name, options={})
  dq = Queue.new(self, name, options).tap do |q|
    q.declare!
  end

  self.register_queue(dq)
end

#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments

Returns:

  • RabbitMQ response

See Also:



552
553
554
555
556
# File 'lib/march_hare/channel.rb', line 552

def queue_bind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_bind(queue, exchange, routing_key, arguments)
  end
end

#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object

Declares a queue using queue.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

  • durable (Boolean)

    (false) Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean)

    (false) Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean)

    (false) Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean)

    (false) If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • RabbitMQ response

See Also:



508
509
510
511
512
# File 'lib/march_hare/channel.rb', line 508

def queue_declare(name, durable, exclusive, auto_delete, arguments = {})
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments)
  end
end

#queue_declare_passive(name) ⇒ Object

Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.

Parameters:

  • name (String)

    Queue name

See Also:



520
521
522
523
524
# File 'lib/march_hare/channel.rb', line 520

def queue_declare_passive(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare_passive(name)
  end
end

#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object

Deletes a queue using queue.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • if_empty (Boolean) (defaults to: false)

    (false) Should this queue be deleted only if it has no messages?

  • if_unused (Boolean) (defaults to: false)

    (false) Should this queue be deleted only if it has no consumers?

Returns:

  • RabbitMQ response

See Also:



535
536
537
538
539
# File 'lib/march_hare/channel.rb', line 535

def queue_delete(name, if_empty = false, if_unused = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_delete(name, if_empty, if_unused)
  end
end

#queue_purge(name) ⇒ Object

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

Returns:

  • RabbitMQ response

See Also:



581
582
583
584
585
# File 'lib/march_hare/channel.rb', line 581

def queue_purge(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_purge(name)
  end
end

#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    ({}) Optional arguments

Returns:

  • RabbitMQ response

See Also:



569
570
571
572
573
# File 'lib/march_hare/channel.rb', line 569

def queue_unbind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_unbind(queue, exchange, routing_key, arguments)
  end
end

#recover_confirm_hooksObject



226
227
228
229
230
# File 'lib/march_hare/channel.rb', line 226

def recover_confirm_hooks
  @confirm_hooks.each do |ch|
    @delegate.add_confirm_listener(ch)
  end
end

#recover_confirm_modeObject

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.



241
242
243
# File 'lib/march_hare/channel.rb', line 241

def recover_confirm_mode
  confirm_select if defined?(@confirm_mode) && @confirm_mode
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/march_hare/channel.rb', line 282

def recover_consumers
  @consumers.values.each do |c|
    begin
      logger.debug("channel: recover consumer #{c.consumer_tag}")
      self.unregister_consumer(c.consumer_tag)
      c.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering consumer #{c.consumer_tag}")
      logger.error(e)
    end
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



254
255
256
257
258
259
260
261
262
263
264
# File 'lib/march_hare/channel.rb', line 254

def recover_exchanges
  @exchanges.values.each do |x|
    begin
      logger.debug("channel: recover exchange #{x.name}")
      x.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering exchange #{x.name}")
      logger.error(e)
    end
  end
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



235
236
237
# File 'lib/march_hare/channel.rb', line 235

def recover_prefetch_setting
  basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



268
269
270
271
272
273
274
275
276
277
278
# File 'lib/march_hare/channel.rb', line 268

def recover_queues
  @queues.values.each do |q|
    begin
      logger.debug("channel: recover queue #{q.name}")
      q.recover_from_network_failure
    rescue Exception => e
      logger.error("Caught exception when recovering queue #{q.name}")
      logger.error(e)
    end
  end
end

#recover_shutdown_hooksObject



219
220
221
222
223
# File 'lib/march_hare/channel.rb', line 219

def recover_shutdown_hooks
  @shutdown_hooks.each do |sh|
    @delegate.add_shutdown_listener(sh)
  end
end

#recover_tx_modeObject

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.



247
248
249
# File 'lib/march_hare/channel.rb', line 247

def recover_tx_mode
  tx_select if defined?(@tx_mode) && @tx_mode
end

#register_consumer(consumer_tag, consumer) ⇒ Object



970
971
972
973
# File 'lib/march_hare/channel.rb', line 970

def register_consumer(consumer_tag, consumer)
  logger.debug("channel: register consumer #{consumer_tag}")
  @consumers[consumer_tag] = consumer
end

#register_exchange(exchange) ⇒ Object



964
965
966
967
# File 'lib/march_hare/channel.rb', line 964

def register_exchange(exchange)
  logger.debug("channel: register exchange #{exchange.name}")
  @exchanges[exchange.name] = exchange
end

#register_queue(queue) ⇒ Object



947
948
949
950
# File 'lib/march_hare/channel.rb', line 947

def register_queue(queue)
  logger.debug("channel: register queue #{queue.name}")
  @queues[queue.name] = queue
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • requeue (Boolean) (defaults to: false)

    Should this message be requeued instead of dropping it?

See Also:



690
691
692
693
694
# File 'lib/march_hare/channel.rb', line 690

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

#revive_with(java_ch) ⇒ Object



214
215
216
# File 'lib/march_hare/channel.rb', line 214

def revive_with(java_ch)
  @delegate = java_ch
end

#sessionMarchHare::Session Also known as: client, connection

Returns Connection this channel is on.

Returns:



141
142
143
# File 'lib/march_hare/channel.rb', line 141

def session
  @connection
end

#topic(name, opts = {}) ⇒ MarchHare::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



381
382
383
384
385
386
387
# File 'lib/march_hare/channel.rb', line 381

def topic(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#tx_commitObject

Commits a transaction



860
861
862
863
864
# File 'lib/march_hare/channel.rb', line 860

def tx_commit
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_commit
  end
end

#tx_rollbackObject

Rolls back a transaction



867
868
869
870
871
# File 'lib/march_hare/channel.rb', line 867

def tx_rollback
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_rollback
  end
end

#tx_selectObject

Enables transactions on the channel



846
847
848
849
850
851
# File 'lib/march_hare/channel.rb', line 846

def tx_select
  converting_rjc_exceptions_to_ruby do
    @tx_mode = true
    @delegate.tx_select
  end
end

#unregister_consumer(consumer_tag) ⇒ Object



976
977
978
979
# File 'lib/march_hare/channel.rb', line 976

def unregister_consumer(consumer_tag)
  logger.debug("channel: unregister consumer #{consumer_tag}")
  @consumers.delete(consumer_tag)
end

#using_publisher_confirms?Boolean Also known as: uses_publisher_confirms?

Returns true if publisher confirms are enabled for this channel.

Returns:

  • (Boolean)

    true if publisher confirms are enabled for this channel



818
819
820
# File 'lib/march_hare/channel.rb', line 818

def using_publisher_confirms?
  !!@confirm_mode
end

#using_tx?Boolean Also known as: uses_tx?

Returns true if transactions are enabled for this channel.

Returns:

  • (Boolean)

    true if transactions are enabled for this channel



854
855
856
# File 'lib/march_hare/channel.rb', line 854

def using_tx?
  !!@tx_mode
end

#wait_for_confirms(timeout = nil) ⇒ Boolean

Waits until all outstanding publisher confirms arrive.

Takes an optional timeout in milliseconds. Will raise an exception in case a timeout has occured.

Parameters:

  • timeout (Integer) (defaults to: nil)

    Timeout in milliseconds

Returns:

  • (Boolean)

    true if all confirms were positive, false if some were negative



831
832
833
834
835
836
837
838
839
# File 'lib/march_hare/channel.rb', line 831

def wait_for_confirms(timeout = nil)
  if timeout
    converting_rjc_exceptions_to_ruby do
      @delegate.wait_for_confirms(timeout)
    end
  else
    @delegate.wait_for_confirms
  end
end