Class: MarchHare::Channel
- Inherits:
-
Object
- Object
- MarchHare::Channel
- Defined in:
- lib/march_hare/channel.rb
Overview
## Channels in RabbitMQ
To quote AMQP 0.9.1 specification:
AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
## Opening Channels
Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:
This will automatically allocate a channel id.
## Closing Channels
Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.
## Higher-level API
MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).
### Queue Operations In Higher-level API
### Exchange Operations In Higher-level API
-
#topic declares a topic exchange. The rest of the API is in Exchange.
-
#direct declares a direct exchange.
-
#fanout declares a fanout exchange.
-
#headers declares a headers exchange.
-
#exchange is used to declare exchanges with type specified as a symbol or string.
## Channel Qos (Prefetch Level)
It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.
## Channel IDs
Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.
There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.
## Channels and Error Handling
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.
Defined Under Namespace
Classes: BlockConfirmListener, BlockReturnListener
Instance Attribute Summary collapse
-
#consumers ⇒ Array<MarchHare::Consumer>
readonly
Consumers on this channel.
-
#recoveries_counter ⇒ Object
readonly
Returns the value of attribute recoveries_counter.
Exchanges collapse
-
#default_exchange ⇒ Object
Provides access to the default exchange.
-
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
-
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension).
-
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
-
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension).
-
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
-
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
Queues collapse
-
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
-
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method.
-
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
-
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method.
-
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method.
-
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
-
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method.
basic.* collapse
-
#ack(delivery_tag, multiple = false) ⇒ Object
(also: #acknowledge)
Acknowledges a message.
-
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
- #basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
- #basic_get(queue, auto_ack) ⇒ Object
-
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
-
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
- #basic_qos(prefetch_count) ⇒ Object
-
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
-
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message.
-
#prefetch ⇒ Integer
Active basic.qos prefetch setting.
-
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages.
- #qos(options = {}) ⇒ Object
-
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message.
Instance Method Summary collapse
- #automatically_recover(session, java_connection) ⇒ Object
-
#channel_number ⇒ Integer
(also: #id, #number)
Channel id.
-
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
-
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
-
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
- #deregister_exchange(exchange) ⇒ Object
- #deregister_queue(queue) ⇒ Object
- #deregister_queue_named(name) ⇒ Object
- #find_queue(name) ⇒ Object
- #gracefully_shut_down_consumers ⇒ Object
- #guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
- #increment_recoveries_counter ⇒ Object
-
#initialize(session, delegate) ⇒ Channel
constructor
A new instance of Channel.
- #method_missing(selector, *args) ⇒ Object
- #next_publisher_seq_no ⇒ Object
-
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler.
-
#on_return(&block) ⇒ Object
Defines a returned message handler.
-
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback.
-
#open? ⇒ Boolean
True if the channel is open.
- #recover_confirm_hooks ⇒ Object
-
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode.
-
#recover_consumers ⇒ Object
Recovers consumers.
-
#recover_exchanges ⇒ Object
Recovers exchanges.
-
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting.
-
#recover_queues ⇒ Object
Recovers queues and bindings.
- #recover_shutdown_hooks ⇒ Object
-
#recover_tx_mode ⇒ Object
Recovers transaction mode.
- #register_consumer(consumer_tag, consumer) ⇒ Object
- #register_exchange(exchange) ⇒ Object
- #register_queue(queue) ⇒ Object
- #revive_with(java_ch) ⇒ Object
-
#session ⇒ MarchHare::Session
(also: #client, #connection)
Connection this channel is on.
-
#tx_commit ⇒ Object
Commits a transaction.
-
#tx_rollback ⇒ Object
Rolls back a transaction.
-
#tx_select ⇒ Object
Enables transactions on the channel.
- #unregister_consumer(consumer_tag) ⇒ Object
-
#using_publisher_confirms? ⇒ Boolean
(also: #uses_publisher_confirms?)
True if publisher confirms are enabled for this channel.
-
#using_tx? ⇒ Boolean
(also: #uses_tx?)
True if transactions are enabled for this channel.
-
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Constructor Details
#initialize(session, delegate) ⇒ Channel
Returns a new instance of Channel.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/march_hare/channel.rb', line 121 def initialize(session, delegate) @connection = session @delegate = delegate @exchanges = JavaConcurrent::ConcurrentHashMap.new @queues = JavaConcurrent::ConcurrentHashMap.new # we keep track of consumers in part to gracefully shut down their # executors when the channel is closed. This frees library users # from having to worry about this. MK. @consumers = JavaConcurrent::ConcurrentHashMap.new @shutdown_hooks = Array.new @confirm_hooks = Array.new @recoveries_counter = JavaConcurrent::AtomicInteger.new(0) on_shutdown do |ch, cause| ch.gracefully_shut_down_consumers end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
877 878 879 |
# File 'lib/march_hare/channel.rb', line 877 def method_missing(selector, *args) @delegate.__send__(selector, *args) end |
Instance Attribute Details
#consumers ⇒ Array<MarchHare::Consumer> (readonly)
Returns Consumers on this channel.
118 119 120 |
# File 'lib/march_hare/channel.rb', line 118 def consumers @consumers end |
#recoveries_counter ⇒ Object (readonly)
Returns the value of attribute recoveries_counter.
290 291 292 |
# File 'lib/march_hare/channel.rb', line 290 def recoveries_counter @recoveries_counter end |
Instance Method Details
#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge
Acknowledges a message. Acknowledged messages are completely removed from the queue.
665 666 667 668 669 |
# File 'lib/march_hare/channel.rb', line 665 def ack(delivery_tag, multiple = false) (delivery_tag) do basic_ack(delivery_tag.to_i, multiple) end end |
#automatically_recover(session, java_connection) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/march_hare/channel.rb', line 189 def automatically_recover(session, java_connection) jch = java_connection.create_channel(id) self.revive_with(jch) self.recover_shutdown_hooks self.recover_prefetch_setting self.recover_confirm_mode self.recover_confirm_hooks self.recover_tx_mode self.recover_exchanges # # this includes bindings recovery self.recover_queues self.recover_consumers self.increment_recoveries_counter end |
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
751 752 753 754 755 |
# File 'lib/march_hare/channel.rb', line 751 def basic_ack(delivery_tag, multiple) converting_rjc_exceptions_to_ruby do @delegate.basic_ack(delivery_tag.to_i, multiple) end end |
#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
617 618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'lib/march_hare/channel.rb', line 617 def basic_consume(queue, auto_ack, consumer_tag = nil, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do if consumer_tag @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer) else @delegate.basic_consume(queue, auto_ack, consumer) end end self.register_consumer(tag, consumer) tag end |
#basic_get(queue, auto_ack) ⇒ Object
611 612 613 614 615 |
# File 'lib/march_hare/channel.rb', line 611 def basic_get(queue, auto_ack) converting_rjc_exceptions_to_ruby do @delegate.basic_get(queue, auto_ack) end end |
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
767 768 769 770 771 |
# File 'lib/march_hare/channel.rb', line 767 def basic_nack(delivery_tag, multiple = false, requeue = false) converting_rjc_exceptions_to_ruby do @delegate.basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
605 606 607 608 609 |
# File 'lib/march_hare/channel.rb', line 605 def basic_publish(exchange, routing_key, mandatory, properties, body) converting_rjc_exceptions_to_ruby do @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body) end end |
#basic_qos(prefetch_count) ⇒ Object
631 632 633 634 635 636 637 638 |
# File 'lib/march_hare/channel.rb', line 631 def basic_qos(prefetch_count) r = converting_rjc_exceptions_to_ruby do @delegate.basic_qos(prefetch_count) end @prefetch_count = prefetch_count r end |
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages
777 778 779 780 781 |
# File 'lib/march_hare/channel.rb', line 777 def basic_recover(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover(requeue) end end |
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages
787 788 789 790 791 |
# File 'lib/march_hare/channel.rb', line 787 def basic_recover_async(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover_async(requeue) end end |
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
738 739 740 741 742 |
# File 'lib/march_hare/channel.rb', line 738 def basic_reject(delivery_tag, requeue) converting_rjc_exceptions_to_ruby do @delegate.basic_reject(delivery_tag.to_i, requeue) end end |
#channel_number ⇒ Integer Also known as: id, number
Returns Channel id.
148 149 150 |
# File 'lib/march_hare/channel.rb', line 148 def channel_number @delegate.channel_number end |
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/march_hare/channel.rb', line 164 def close(code = 200, reason = "Goodbye") v = @delegate.close(code, reason) @consumers.each do |tag, consumer| consumer.gracefully_shut_down end @connection.unregister_channel(self) v end |
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
800 801 802 803 804 805 |
# File 'lib/march_hare/channel.rb', line 800 def confirm_select converting_rjc_exceptions_to_ruby do @confirm_mode = true @delegate.confirm_select end end |
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
976 977 978 979 980 981 982 |
# File 'lib/march_hare/channel.rb', line 976 def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue Exception, java.lang.Throwable => e Exceptions.convert_and_reraise(e) end end |
#default_exchange ⇒ Object
Provides access to the default exchange
402 403 404 |
# File 'lib/march_hare/channel.rb', line 402 def default_exchange @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct") end |
#deregister_exchange(exchange) ⇒ Object
946 947 948 |
# File 'lib/march_hare/channel.rb', line 946 def deregister_exchange(exchange) @exchanges.delete(exchange.name) end |
#deregister_queue(queue) ⇒ Object
926 927 928 |
# File 'lib/march_hare/channel.rb', line 926 def deregister_queue(queue) @queues.delete(queue.name) end |
#deregister_queue_named(name) ⇒ Object
931 932 933 |
# File 'lib/march_hare/channel.rb', line 931 def deregister_queue_named(name) @queues.delete(name) end |
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
350 351 352 353 354 355 356 |
# File 'lib/march_hare/channel.rb', line 350 def direct(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
308 309 310 311 312 313 314 |
# File 'lib/march_hare/channel.rb', line 308 def exchange(name, ={}) dx = Exchange.new(self, name, ).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)
434 435 436 437 438 |
# File 'lib/march_hare/channel.rb', line 434 def exchange_bind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_bind(destination, source, routing_key, arguments) end end |
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
417 418 419 420 421 |
# File 'lib/march_hare/channel.rb', line 417 def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments) end end |
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)
451 452 453 454 455 |
# File 'lib/march_hare/channel.rb', line 451 def exchange_unbind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_unbind(destination, source, routing_key, arguments) end end |
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
329 330 331 332 333 334 335 |
# File 'lib/march_hare/channel.rb', line 329 def fanout(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x| x.declare! end self.register_exchange(dx) end |
#find_queue(name) ⇒ Object
941 942 943 |
# File 'lib/march_hare/channel.rb', line 941 def find_queue(name) @queues[name] end |
#gracefully_shut_down_consumers ⇒ Object
966 967 968 969 970 |
# File 'lib/march_hare/channel.rb', line 966 def gracefully_shut_down_consumers @consumers.each do |tag, consumer| consumer.gracefully_shut_down end end |
#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
985 986 987 988 989 990 991 992 993 994 995 996 997 998 |
# File 'lib/march_hare/channel.rb', line 985 def (tag, &block) case tag # if a fixnum was passed, execute unconditionally. MK. when Fixnum then block.call # versioned delivery tags should be checked to avoid # sending out stale (invalid) tags after channel was reopened # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end end |
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
392 393 394 395 396 397 398 |
# File 'lib/march_hare/channel.rb', line 392 def headers(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x| x.declare! end self.register_exchange(dx) end |
#increment_recoveries_counter ⇒ Object
286 287 288 |
# File 'lib/march_hare/channel.rb', line 286 def increment_recoveries_counter @recoveries_counter.increment_and_get end |
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.
695 696 697 698 699 |
# File 'lib/march_hare/channel.rb', line 695 def nack(delivery_tag, multiple = false, requeue = false) (delivery_tag) do basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#next_publisher_seq_no ⇒ Object
831 832 833 |
# File 'lib/march_hare/channel.rb', line 831 def next_publisher_seq_no @delegate.next_publisher_seq_no end |
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler
871 872 873 874 875 |
# File 'lib/march_hare/channel.rb', line 871 def on_confirm(&block) ch = BlockConfirmListener.from(block) self.add_confirm_listener(ch) @confirm_hooks << ch end |
#on_return(&block) ⇒ Object
Defines a returned message handler.
865 866 867 |
# File 'lib/march_hare/channel.rb', line 865 def on_return(&block) self.add_return_listener(BlockReturnListener.from(block)) end |
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.
179 180 181 182 183 184 185 186 |
# File 'lib/march_hare/channel.rb', line 179 def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @shutdown_hooks << sh @delegate.add_shutdown_listener(sh) sh end |
#open? ⇒ Boolean
Returns true if the channel is open.
155 156 157 |
# File 'lib/march_hare/channel.rb', line 155 def open? @delegate.open? end |
#prefetch ⇒ Integer
Returns Active basic.qos prefetch setting.
655 656 657 |
# File 'lib/march_hare/channel.rb', line 655 def prefetch @prefetch_count || 0 end |
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages
650 651 652 |
# File 'lib/march_hare/channel.rb', line 650 def prefetch=(n) basic_qos(n) end |
#qos(options = {}) ⇒ Object
640 641 642 |
# File 'lib/march_hare/channel.rb', line 640 def qos(={}) basic_qos(.fetch(:prefetch_count, 0)) end |
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
475 476 477 478 479 480 481 |
# File 'lib/march_hare/channel.rb', line 475 def queue(name, ={}) dq = Queue.new(self, name, ).tap do |q| q.declare! end self.register_queue(dq) end |
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
542 543 544 545 546 |
# File 'lib/march_hare/channel.rb', line 542 def queue_bind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_bind(queue, exchange, routing_key, arguments) end end |
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
498 499 500 501 502 |
# File 'lib/march_hare/channel.rb', line 498 def queue_declare(name, durable, exclusive, auto_delete, arguments = {}) converting_rjc_exceptions_to_ruby do @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments) end end |
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.
510 511 512 513 514 |
# File 'lib/march_hare/channel.rb', line 510 def queue_declare_passive(name) converting_rjc_exceptions_to_ruby do @delegate.queue_declare_passive(name) end end |
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method
525 526 527 528 529 |
# File 'lib/march_hare/channel.rb', line 525 def queue_delete(name, if_empty = false, if_unused = false) converting_rjc_exceptions_to_ruby do @delegate.queue_delete(name, if_empty, if_unused) end end |
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
571 572 573 574 575 |
# File 'lib/march_hare/channel.rb', line 571 def queue_purge(name) converting_rjc_exceptions_to_ruby do @delegate.queue_purge(name) end end |
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
559 560 561 562 563 |
# File 'lib/march_hare/channel.rb', line 559 def queue_unbind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_unbind(queue, exchange, routing_key, arguments) end end |
#recover_confirm_hooks ⇒ Object
219 220 221 222 223 |
# File 'lib/march_hare/channel.rb', line 219 def recover_confirm_hooks @confirm_hooks.each do |ch| @delegate.add_confirm_listener(ch) end end |
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.
234 235 236 |
# File 'lib/march_hare/channel.rb', line 234 def recover_confirm_mode confirm_select if defined?(@confirm_mode) && @confirm_mode end |
#recover_consumers ⇒ Object
Recovers consumers. Used by the Automatic Network Failure Recovery feature.
273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/march_hare/channel.rb', line 273 def recover_consumers @consumers.values.each do |c| begin self.unregister_consumer(c.consumer_tag) c.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering consumer #{c.consumer_tag}" end end end |
#recover_exchanges ⇒ Object
Recovers exchanges. Used by the Automatic Network Failure Recovery feature.
247 248 249 250 251 252 253 254 255 256 |
# File 'lib/march_hare/channel.rb', line 247 def recover_exchanges @exchanges.values.each do |x| begin x.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering exchange #{x.name}" end end end |
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.
228 229 230 |
# File 'lib/march_hare/channel.rb', line 228 def recover_prefetch_setting basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count end |
#recover_queues ⇒ Object
Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/march_hare/channel.rb', line 260 def recover_queues @queues.values.each do |q| begin q.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering queue #{q.name}" end end end |
#recover_shutdown_hooks ⇒ Object
212 213 214 215 216 |
# File 'lib/march_hare/channel.rb', line 212 def recover_shutdown_hooks @shutdown_hooks.each do |sh| @delegate.add_shutdown_listener(sh) end end |
#recover_tx_mode ⇒ Object
Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.
240 241 242 |
# File 'lib/march_hare/channel.rb', line 240 def recover_tx_mode tx_select if defined?(@tx_mode) && @tx_mode end |
#register_consumer(consumer_tag, consumer) ⇒ Object
956 957 958 |
# File 'lib/march_hare/channel.rb', line 956 def register_consumer(consumer_tag, consumer) @consumers[consumer_tag] = consumer end |
#register_exchange(exchange) ⇒ Object
951 952 953 |
# File 'lib/march_hare/channel.rb', line 951 def register_exchange(exchange) @exchanges[exchange.name] = exchange end |
#register_queue(queue) ⇒ Object
936 937 938 |
# File 'lib/march_hare/channel.rb', line 936 def register_queue(queue) @queues[queue.name] = queue end |
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
680 681 682 683 684 |
# File 'lib/march_hare/channel.rb', line 680 def reject(delivery_tag, requeue = false) (delivery_tag) do basic_reject(delivery_tag.to_i, requeue) end end |
#revive_with(java_ch) ⇒ Object
207 208 209 |
# File 'lib/march_hare/channel.rb', line 207 def revive_with(java_ch) @delegate = java_ch end |
#session ⇒ MarchHare::Session Also known as: client, connection
Returns Connection this channel is on.
141 142 143 |
# File 'lib/march_hare/channel.rb', line 141 def session @connection end |
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
371 372 373 374 375 376 377 |
# File 'lib/march_hare/channel.rb', line 371 def topic(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x| x.declare! end self.register_exchange(dx) end |
#tx_commit ⇒ Object
Commits a transaction
850 851 852 853 854 |
# File 'lib/march_hare/channel.rb', line 850 def tx_commit converting_rjc_exceptions_to_ruby do @delegate.tx_commit end end |
#tx_rollback ⇒ Object
Rolls back a transaction
857 858 859 860 861 |
# File 'lib/march_hare/channel.rb', line 857 def tx_rollback converting_rjc_exceptions_to_ruby do @delegate.tx_rollback end end |
#tx_select ⇒ Object
Enables transactions on the channel
836 837 838 839 840 841 |
# File 'lib/march_hare/channel.rb', line 836 def tx_select converting_rjc_exceptions_to_ruby do @tx_mode = true @delegate.tx_select end end |
#unregister_consumer(consumer_tag) ⇒ Object
961 962 963 |
# File 'lib/march_hare/channel.rb', line 961 def unregister_consumer(consumer_tag) @consumers.delete(consumer_tag) end |
#using_publisher_confirms? ⇒ Boolean Also known as: uses_publisher_confirms?
Returns true if publisher confirms are enabled for this channel.
808 809 810 |
# File 'lib/march_hare/channel.rb', line 808 def using_publisher_confirms? !!@confirm_mode end |
#using_tx? ⇒ Boolean Also known as: uses_tx?
Returns true if transactions are enabled for this channel.
844 845 846 |
# File 'lib/march_hare/channel.rb', line 844 def using_tx? !!@tx_mode end |
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Takes an optional timeout in milliseconds. Will raise an exception in timeout has occured.
821 822 823 824 825 826 827 828 829 |
# File 'lib/march_hare/channel.rb', line 821 def wait_for_confirms(timeout = nil) if timeout converting_rjc_exceptions_to_ruby do @delegate.wait_for_confirms(timeout) end else @delegate.wait_for_confirms end end |