Class: MarchHare::Channel
- Inherits:
-
Object
- Object
- MarchHare::Channel
- Defined in:
- lib/march_hare/channel.rb
Overview
## Channels in RabbitMQ
To quote AMQP 0.9.1 specification:
AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
## Opening Channels
Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:
This will automatically allocate a channel id.
## Closing Channels
Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.
## Higher-level API
MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).
### Queue Operations In Higher-level API
### Exchange Operations In Higher-level API
-
#topic declares a topic exchange. The rest of the API is in Exchange.
-
#direct declares a direct exchange.
-
#fanout declares a fanout exchange.
-
#headers declares a headers exchange.
-
#exchange is used to declare exchanges with type specified as a symbol or string.
## Channel Qos (Prefetch Level)
It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.
## Channel IDs
Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.
There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.
## Channels and Error Handling
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.
Defined Under Namespace
Classes: BlockConfirmListener, BlockReturnListener
Instance Attribute Summary collapse
-
#consumers ⇒ Array<MarchHare::Consumer>
readonly
Consumers on this channel.
-
#recoveries_counter ⇒ Object
readonly
Returns the value of attribute recoveries_counter.
Exchanges collapse
-
#default_exchange ⇒ Object
Provides access to the default exchange.
-
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
-
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension).
-
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
-
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension).
-
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
-
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
Queues collapse
-
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
-
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method.
-
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
-
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method.
-
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method.
-
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
-
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method.
basic.* collapse
-
#ack(delivery_tag, multiple = false) ⇒ Object
(also: #acknowledge)
Acknowledges a message.
-
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
- #basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
- #basic_get(queue, auto_ack) ⇒ Object
-
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
-
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
- #basic_qos(prefetch_count) ⇒ Object
-
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
-
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message.
-
#prefetch ⇒ Integer
Active basic.qos prefetch setting.
-
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages.
- #qos(options = {}) ⇒ Object
-
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message.
Instance Method Summary collapse
- #automatically_recover(session, java_connection) ⇒ Object
-
#channel_number ⇒ Integer
(also: #id, #number)
Channel id.
-
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
-
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
-
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
- #deregister_exchange(exchange) ⇒ Object
- #deregister_queue(queue) ⇒ Object
- #deregister_queue_named(name) ⇒ Object
- #find_queue(name) ⇒ Object
- #gracefully_shut_down_consumers ⇒ Object
- #guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
- #increment_recoveries_counter ⇒ Object
-
#initialize(session, delegate) ⇒ Channel
constructor
A new instance of Channel.
-
#logger ⇒ ::Logger
Logger instance from the connection.
- #method_missing(selector, *args) ⇒ Object
- #next_publisher_seq_no ⇒ Object
-
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler.
-
#on_return(&block) ⇒ Object
Defines a returned message handler.
-
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback.
-
#open? ⇒ Boolean
True if the channel is open.
- #recover_confirm_hooks ⇒ Object
-
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode.
-
#recover_consumers ⇒ Object
Recovers consumers.
-
#recover_exchanges ⇒ Object
Recovers exchanges.
-
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting.
-
#recover_queues ⇒ Object
Recovers queues and bindings.
- #recover_shutdown_hooks ⇒ Object
-
#recover_tx_mode ⇒ Object
Recovers transaction mode.
- #register_consumer(consumer_tag, consumer) ⇒ Object
- #register_exchange(exchange) ⇒ Object
- #register_queue(queue) ⇒ Object
- #revive_with(java_ch) ⇒ Object
-
#session ⇒ MarchHare::Session
(also: #client, #connection)
Connection this channel is on.
-
#tx_commit ⇒ Object
Commits a transaction.
-
#tx_rollback ⇒ Object
Rolls back a transaction.
-
#tx_select ⇒ Object
Enables transactions on the channel.
- #unregister_consumer(consumer_tag) ⇒ Object
-
#using_publisher_confirms? ⇒ Boolean
(also: #uses_publisher_confirms?)
True if publisher confirms are enabled for this channel.
-
#using_tx? ⇒ Boolean
(also: #uses_tx?)
True if transactions are enabled for this channel.
-
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Constructor Details
#initialize(session, delegate) ⇒ Channel
Returns a new instance of Channel.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/march_hare/channel.rb', line 121 def initialize(session, delegate) @connection = session @delegate = delegate @exchanges = JavaConcurrent::ConcurrentHashMap.new @queues = JavaConcurrent::ConcurrentHashMap.new # we keep track of consumers in part to gracefully shut down their # executors when the channel is closed. This frees library users # from having to worry about this. MK. @consumers = JavaConcurrent::ConcurrentHashMap.new @shutdown_hooks = Array.new @confirm_hooks = Array.new @recoveries_counter = JavaConcurrent::AtomicInteger.new(0) on_shutdown do |ch, cause| ch.gracefully_shut_down_consumers end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
887 888 889 |
# File 'lib/march_hare/channel.rb', line 887 def method_missing(selector, *args) @delegate.__send__(selector, *args) end |
Instance Attribute Details
#consumers ⇒ Array<MarchHare::Consumer> (readonly)
Returns Consumers on this channel.
118 119 120 |
# File 'lib/march_hare/channel.rb', line 118 def consumers @consumers end |
#recoveries_counter ⇒ Object (readonly)
Returns the value of attribute recoveries_counter.
300 301 302 |
# File 'lib/march_hare/channel.rb', line 300 def recoveries_counter @recoveries_counter end |
Instance Method Details
#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge
Acknowledges a message. Acknowledged messages are completely removed from the queue.
675 676 677 678 679 |
# File 'lib/march_hare/channel.rb', line 675 def ack(delivery_tag, multiple = false) (delivery_tag) do basic_ack(delivery_tag.to_i, multiple) end end |
#automatically_recover(session, java_connection) ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/march_hare/channel.rb', line 194 def automatically_recover(session, java_connection) logger.debug("channel: begin automatic connection recovery") jch = java_connection.create_channel(id) self.revive_with(jch) self.recover_shutdown_hooks self.recover_prefetch_setting self.recover_confirm_mode self.recover_confirm_hooks self.recover_tx_mode self.recover_exchanges # # this includes bindings recovery self.recover_queues self.recover_consumers self.increment_recoveries_counter end |
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
761 762 763 764 765 |
# File 'lib/march_hare/channel.rb', line 761 def basic_ack(delivery_tag, multiple) converting_rjc_exceptions_to_ruby do @delegate.basic_ack(delivery_tag.to_i, multiple) end end |
#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
627 628 629 630 631 632 633 634 635 636 637 638 639 |
# File 'lib/march_hare/channel.rb', line 627 def basic_consume(queue, auto_ack, consumer_tag = nil, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do if consumer_tag @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer) else @delegate.basic_consume(queue, auto_ack, consumer) end end self.register_consumer(tag, consumer) tag end |
#basic_get(queue, auto_ack) ⇒ Object
621 622 623 624 625 |
# File 'lib/march_hare/channel.rb', line 621 def basic_get(queue, auto_ack) converting_rjc_exceptions_to_ruby do @delegate.basic_get(queue, auto_ack) end end |
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
777 778 779 780 781 |
# File 'lib/march_hare/channel.rb', line 777 def basic_nack(delivery_tag, multiple = false, requeue = false) converting_rjc_exceptions_to_ruby do @delegate.basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
615 616 617 618 619 |
# File 'lib/march_hare/channel.rb', line 615 def basic_publish(exchange, routing_key, mandatory, properties, body) converting_rjc_exceptions_to_ruby do @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body) end end |
#basic_qos(prefetch_count) ⇒ Object
641 642 643 644 645 646 647 648 |
# File 'lib/march_hare/channel.rb', line 641 def basic_qos(prefetch_count) r = converting_rjc_exceptions_to_ruby do @delegate.basic_qos(prefetch_count) end @prefetch_count = prefetch_count r end |
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages
787 788 789 790 791 |
# File 'lib/march_hare/channel.rb', line 787 def basic_recover(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover(requeue) end end |
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages
797 798 799 800 801 |
# File 'lib/march_hare/channel.rb', line 797 def basic_recover_async(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover_async(requeue) end end |
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
748 749 750 751 752 |
# File 'lib/march_hare/channel.rb', line 748 def basic_reject(delivery_tag, requeue) converting_rjc_exceptions_to_ruby do @delegate.basic_reject(delivery_tag.to_i, requeue) end end |
#channel_number ⇒ Integer Also known as: id, number
Returns Channel id.
153 154 155 |
# File 'lib/march_hare/channel.rb', line 153 def channel_number @delegate.channel_number end |
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.
169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/march_hare/channel.rb', line 169 def close(code = 200, reason = "Goodbye") v = @delegate.close(code, reason) @consumers.each do |tag, consumer| consumer.gracefully_shut_down end @connection.unregister_channel(self) v end |
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
810 811 812 813 814 815 |
# File 'lib/march_hare/channel.rb', line 810 def confirm_select converting_rjc_exceptions_to_ruby do @confirm_mode = true @delegate.confirm_select end end |
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
992 993 994 995 996 997 998 |
# File 'lib/march_hare/channel.rb', line 992 def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue Exception, java.lang.Throwable => e Exceptions.convert_and_reraise(e) end end |
#default_exchange ⇒ Object
Provides access to the default exchange
412 413 414 |
# File 'lib/march_hare/channel.rb', line 412 def default_exchange @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct") end |
#deregister_exchange(exchange) ⇒ Object
958 959 960 961 |
# File 'lib/march_hare/channel.rb', line 958 def deregister_exchange(exchange) logger.debug("channel: deregister exchange #{exchange.name}") @exchanges.delete(exchange.name) end |
#deregister_queue(queue) ⇒ Object
936 937 938 939 |
# File 'lib/march_hare/channel.rb', line 936 def deregister_queue(queue) logger.debug("channel: deregister queue #{queue.name}") @queues.delete(queue.name) end |
#deregister_queue_named(name) ⇒ Object
942 943 944 |
# File 'lib/march_hare/channel.rb', line 942 def deregister_queue_named(name) @queues.delete(name) end |
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
360 361 362 363 364 365 366 |
# File 'lib/march_hare/channel.rb', line 360 def direct(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
318 319 320 321 322 323 324 |
# File 'lib/march_hare/channel.rb', line 318 def exchange(name, ={}) dx = Exchange.new(self, name, ).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)
444 445 446 447 448 |
# File 'lib/march_hare/channel.rb', line 444 def exchange_bind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_bind(destination, source, routing_key, arguments) end end |
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
427 428 429 430 431 |
# File 'lib/march_hare/channel.rb', line 427 def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments) end end |
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)
461 462 463 464 465 |
# File 'lib/march_hare/channel.rb', line 461 def exchange_unbind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_unbind(destination, source, routing_key, arguments) end end |
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
339 340 341 342 343 344 345 |
# File 'lib/march_hare/channel.rb', line 339 def fanout(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x| x.declare! end self.register_exchange(dx) end |
#find_queue(name) ⇒ Object
953 954 955 |
# File 'lib/march_hare/channel.rb', line 953 def find_queue(name) @queues[name] end |
#gracefully_shut_down_consumers ⇒ Object
982 983 984 985 986 |
# File 'lib/march_hare/channel.rb', line 982 def gracefully_shut_down_consumers @consumers.each do |tag, consumer| consumer.gracefully_shut_down end end |
#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 |
# File 'lib/march_hare/channel.rb', line 1001 def (tag, &block) case tag # if a fixnum was passed, execute unconditionally. MK. when Integer then block.call # versioned delivery tags should be checked to avoid # sending out stale (invalid) tags after channel was reopened # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end end |
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
402 403 404 405 406 407 408 |
# File 'lib/march_hare/channel.rb', line 402 def headers(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x| x.declare! end self.register_exchange(dx) end |
#increment_recoveries_counter ⇒ Object
296 297 298 |
# File 'lib/march_hare/channel.rb', line 296 def increment_recoveries_counter @recoveries_counter.increment_and_get end |
#logger ⇒ ::Logger
Returns Logger instance from the connection.
148 149 150 |
# File 'lib/march_hare/channel.rb', line 148 def logger @connection.logger end |
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.
705 706 707 708 709 |
# File 'lib/march_hare/channel.rb', line 705 def nack(delivery_tag, multiple = false, requeue = false) (delivery_tag) do basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#next_publisher_seq_no ⇒ Object
841 842 843 |
# File 'lib/march_hare/channel.rb', line 841 def next_publisher_seq_no @delegate.next_publisher_seq_no end |
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler
881 882 883 884 885 |
# File 'lib/march_hare/channel.rb', line 881 def on_confirm(&block) ch = BlockConfirmListener.from(block) self.add_confirm_listener(ch) @confirm_hooks << ch end |
#on_return(&block) ⇒ Object
Defines a returned message handler.
875 876 877 |
# File 'lib/march_hare/channel.rb', line 875 def on_return(&block) self.add_return_listener(BlockReturnListener.from(block)) end |
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.
184 185 186 187 188 189 190 191 |
# File 'lib/march_hare/channel.rb', line 184 def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @shutdown_hooks << sh @delegate.add_shutdown_listener(sh) sh end |
#open? ⇒ Boolean
Returns true if the channel is open.
160 161 162 |
# File 'lib/march_hare/channel.rb', line 160 def open? @delegate.open? end |
#prefetch ⇒ Integer
Returns Active basic.qos prefetch setting.
665 666 667 |
# File 'lib/march_hare/channel.rb', line 665 def prefetch @prefetch_count || 0 end |
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages
660 661 662 |
# File 'lib/march_hare/channel.rb', line 660 def prefetch=(n) basic_qos(n) end |
#qos(options = {}) ⇒ Object
650 651 652 |
# File 'lib/march_hare/channel.rb', line 650 def qos(={}) basic_qos(.fetch(:prefetch_count, 0)) end |
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
485 486 487 488 489 490 491 |
# File 'lib/march_hare/channel.rb', line 485 def queue(name, ={}) dq = Queue.new(self, name, ).tap do |q| q.declare! end self.register_queue(dq) end |
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
552 553 554 555 556 |
# File 'lib/march_hare/channel.rb', line 552 def queue_bind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_bind(queue, exchange, routing_key, arguments) end end |
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
508 509 510 511 512 |
# File 'lib/march_hare/channel.rb', line 508 def queue_declare(name, durable, exclusive, auto_delete, arguments = {}) converting_rjc_exceptions_to_ruby do @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments) end end |
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.
520 521 522 523 524 |
# File 'lib/march_hare/channel.rb', line 520 def queue_declare_passive(name) converting_rjc_exceptions_to_ruby do @delegate.queue_declare_passive(name) end end |
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method
535 536 537 538 539 |
# File 'lib/march_hare/channel.rb', line 535 def queue_delete(name, if_empty = false, if_unused = false) converting_rjc_exceptions_to_ruby do @delegate.queue_delete(name, if_empty, if_unused) end end |
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
581 582 583 584 585 |
# File 'lib/march_hare/channel.rb', line 581 def queue_purge(name) converting_rjc_exceptions_to_ruby do @delegate.queue_purge(name) end end |
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
569 570 571 572 573 |
# File 'lib/march_hare/channel.rb', line 569 def queue_unbind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_unbind(queue, exchange, routing_key, arguments) end end |
#recover_confirm_hooks ⇒ Object
226 227 228 229 230 |
# File 'lib/march_hare/channel.rb', line 226 def recover_confirm_hooks @confirm_hooks.each do |ch| @delegate.add_confirm_listener(ch) end end |
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.
241 242 243 |
# File 'lib/march_hare/channel.rb', line 241 def recover_confirm_mode confirm_select if defined?(@confirm_mode) && @confirm_mode end |
#recover_consumers ⇒ Object
Recovers consumers. Used by the Automatic Network Failure Recovery feature.
282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/march_hare/channel.rb', line 282 def recover_consumers @consumers.values.each do |c| begin logger.debug("channel: recover consumer #{c.consumer_tag}") self.unregister_consumer(c.consumer_tag) c.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering consumer #{c.consumer_tag}") logger.error(e) end end end |
#recover_exchanges ⇒ Object
Recovers exchanges. Used by the Automatic Network Failure Recovery feature.
254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/march_hare/channel.rb', line 254 def recover_exchanges @exchanges.values.each do |x| begin logger.debug("channel: recover exchange #{x.name}") x.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering exchange #{x.name}") logger.error(e) end end end |
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.
235 236 237 |
# File 'lib/march_hare/channel.rb', line 235 def recover_prefetch_setting basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count end |
#recover_queues ⇒ Object
Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/march_hare/channel.rb', line 268 def recover_queues @queues.values.each do |q| begin logger.debug("channel: recover queue #{q.name}") q.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering queue #{q.name}") logger.error(e) end end end |
#recover_shutdown_hooks ⇒ Object
219 220 221 222 223 |
# File 'lib/march_hare/channel.rb', line 219 def recover_shutdown_hooks @shutdown_hooks.each do |sh| @delegate.add_shutdown_listener(sh) end end |
#recover_tx_mode ⇒ Object
Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.
247 248 249 |
# File 'lib/march_hare/channel.rb', line 247 def recover_tx_mode tx_select if defined?(@tx_mode) && @tx_mode end |
#register_consumer(consumer_tag, consumer) ⇒ Object
970 971 972 973 |
# File 'lib/march_hare/channel.rb', line 970 def register_consumer(consumer_tag, consumer) logger.debug("channel: register consumer #{consumer_tag}") @consumers[consumer_tag] = consumer end |
#register_exchange(exchange) ⇒ Object
964 965 966 967 |
# File 'lib/march_hare/channel.rb', line 964 def register_exchange(exchange) logger.debug("channel: register exchange #{exchange.name}") @exchanges[exchange.name] = exchange end |
#register_queue(queue) ⇒ Object
947 948 949 950 |
# File 'lib/march_hare/channel.rb', line 947 def register_queue(queue) logger.debug("channel: register queue #{queue.name}") @queues[queue.name] = queue end |
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
690 691 692 693 694 |
# File 'lib/march_hare/channel.rb', line 690 def reject(delivery_tag, requeue = false) (delivery_tag) do basic_reject(delivery_tag.to_i, requeue) end end |
#revive_with(java_ch) ⇒ Object
214 215 216 |
# File 'lib/march_hare/channel.rb', line 214 def revive_with(java_ch) @delegate = java_ch end |
#session ⇒ MarchHare::Session Also known as: client, connection
Returns Connection this channel is on.
141 142 143 |
# File 'lib/march_hare/channel.rb', line 141 def session @connection end |
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
381 382 383 384 385 386 387 |
# File 'lib/march_hare/channel.rb', line 381 def topic(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x| x.declare! end self.register_exchange(dx) end |
#tx_commit ⇒ Object
Commits a transaction
860 861 862 863 864 |
# File 'lib/march_hare/channel.rb', line 860 def tx_commit converting_rjc_exceptions_to_ruby do @delegate.tx_commit end end |
#tx_rollback ⇒ Object
Rolls back a transaction
867 868 869 870 871 |
# File 'lib/march_hare/channel.rb', line 867 def tx_rollback converting_rjc_exceptions_to_ruby do @delegate.tx_rollback end end |
#tx_select ⇒ Object
Enables transactions on the channel
846 847 848 849 850 851 |
# File 'lib/march_hare/channel.rb', line 846 def tx_select converting_rjc_exceptions_to_ruby do @tx_mode = true @delegate.tx_select end end |
#unregister_consumer(consumer_tag) ⇒ Object
976 977 978 979 |
# File 'lib/march_hare/channel.rb', line 976 def unregister_consumer(consumer_tag) logger.debug("channel: unregister consumer #{consumer_tag}") @consumers.delete(consumer_tag) end |
#using_publisher_confirms? ⇒ Boolean Also known as: uses_publisher_confirms?
Returns true if publisher confirms are enabled for this channel.
818 819 820 |
# File 'lib/march_hare/channel.rb', line 818 def using_publisher_confirms? !!@confirm_mode end |
#using_tx? ⇒ Boolean Also known as: uses_tx?
Returns true if transactions are enabled for this channel.
854 855 856 |
# File 'lib/march_hare/channel.rb', line 854 def using_tx? !!@tx_mode end |
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Takes an optional timeout in milliseconds. Will raise an exception in case a timeout has occured.
831 832 833 834 835 836 837 838 839 |
# File 'lib/march_hare/channel.rb', line 831 def wait_for_confirms(timeout = nil) if timeout converting_rjc_exceptions_to_ruby do @delegate.wait_for_confirms(timeout) end else @delegate.wait_for_confirms end end |