Class: MarchHare::Channel
- Inherits:
-
Object
- Object
- MarchHare::Channel
- Defined in:
- lib/march_hare/channel.rb
Overview
## Channels in RabbitMQ
To quote AMQP 0.9.1 specification:
AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
## Opening Channels
Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:
This will automatically allocate a channel id.
## Closing Channels
Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.
## Higher-level API
MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).
### Queue Operations In Higher-level API
### Exchange Operations In Higher-level API
-
#topic declares a topic exchange. The rest of the API is in Exchange.
-
#direct declares a direct exchange.
-
#fanout declares a fanout exchange.
-
#headers declares a headers exchange.
-
#exchange is used to declare exchanges with type specified as a symbol or string.
## Channel Qos (Prefetch Level)
It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.
## Channel IDs
Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.
There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.
## Channels and Error Handling
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.
Defined Under Namespace
Classes: BlockConfirmListener, BlockReturnListener
Instance Attribute Summary collapse
-
#consumers ⇒ Array<MarchHare::Consumer>
readonly
Consumers on this channel.
-
#recoveries_counter ⇒ Object
readonly
Returns the value of attribute recoveries_counter.
Exchanges collapse
-
#default_exchange ⇒ Object
Provides access to the default exchange.
-
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
-
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension).
-
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
-
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension).
-
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
-
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
Queues collapse
-
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
-
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method.
-
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
-
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method.
-
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method.
-
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
-
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method.
basic.* collapse
-
#ack(delivery_tag, multiple = false) ⇒ Object
(also: #acknowledge)
Acknowledges a message.
-
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
- #basic_consume(queue, auto_ack, consumer) ⇒ Object
- #basic_get(queue, auto_ack) ⇒ Object
-
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
-
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
- #basic_qos(prefetch_count) ⇒ Object
-
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
-
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message.
-
#prefetch ⇒ Integer
Active basic.qos prefetch setting.
-
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages.
- #qos(options = {}) ⇒ Object
-
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message.
Instance Method Summary collapse
- #automatically_recover(session, java_connection) ⇒ Object
-
#channel_number ⇒ Integer
(also: #id, #number)
Channel id.
-
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
-
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
-
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
- #deregister_exchange(exchange) ⇒ Object
- #deregister_queue(queue) ⇒ Object
- #deregister_queue_named(name) ⇒ Object
- #find_queue(name) ⇒ Object
- #gracefully_shut_down_consumers ⇒ Object
- #guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
- #increment_recoveries_counter ⇒ Object
-
#initialize(session, delegate) ⇒ Channel
constructor
A new instance of Channel.
- #method_missing(selector, *args) ⇒ Object
- #next_publisher_seq_no ⇒ Object
-
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler.
-
#on_return(&block) ⇒ Object
Defines a returned message handler.
-
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback.
-
#open? ⇒ Boolean
True if the channel is open.
- #recover_confirm_hooks ⇒ Object
-
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode.
-
#recover_consumers ⇒ Object
Recovers consumers.
-
#recover_exchanges ⇒ Object
Recovers exchanges.
-
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting.
-
#recover_queues ⇒ Object
Recovers queues and bindings.
- #recover_shutdown_hooks ⇒ Object
-
#recover_tx_mode ⇒ Object
Recovers transaction mode.
- #register_consumer(consumer_tag, consumer) ⇒ Object
- #register_exchange(exchange) ⇒ Object
- #register_queue(queue) ⇒ Object
- #revive_with(java_ch) ⇒ Object
-
#session ⇒ MarchHare::Session
(also: #client, #connection)
Connection this channel is on.
-
#tx_commit ⇒ Object
Commits a transaction.
-
#tx_rollback ⇒ Object
Rolls back a transaction.
-
#tx_select ⇒ Object
Enables transactions on the channel.
- #unregister_consumer(consumer_tag) ⇒ Object
-
#using_publisher_confirms? ⇒ Boolean
(also: #uses_publisher_confirms?)
True if publisher confirms are enabled for this channel.
-
#using_tx? ⇒ Boolean
(also: #uses_tx?)
True if transactions are enabled for this channel.
-
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Constructor Details
#initialize(session, delegate) ⇒ Channel
Returns a new instance of Channel.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/march_hare/channel.rb', line 121 def initialize(session, delegate) @connection = session @delegate = delegate @exchanges = JavaConcurrent::ConcurrentHashMap.new @queues = JavaConcurrent::ConcurrentHashMap.new # we keep track of consumers in part to gracefully shut down their # executors when the channel is closed. This frees library users # from having to worry about this. MK. @consumers = JavaConcurrent::ConcurrentHashMap.new @shutdown_hooks = Array.new @confirm_hooks = Array.new @recoveries_counter = JavaConcurrent::AtomicInteger.new(0) on_shutdown do |ch, cause| ch.gracefully_shut_down_consumers end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
867 868 869 |
# File 'lib/march_hare/channel.rb', line 867 def method_missing(selector, *args) @delegate.__send__(selector, *args) end |
Instance Attribute Details
#consumers ⇒ Array<MarchHare::Consumer> (readonly)
Returns Consumers on this channel.
118 119 120 |
# File 'lib/march_hare/channel.rb', line 118 def consumers @consumers end |
#recoveries_counter ⇒ Object (readonly)
Returns the value of attribute recoveries_counter.
290 291 292 |
# File 'lib/march_hare/channel.rb', line 290 def recoveries_counter @recoveries_counter end |
Instance Method Details
#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge
Acknowledges a message. Acknowledged messages are completely removed from the queue.
655 656 657 658 659 |
# File 'lib/march_hare/channel.rb', line 655 def ack(delivery_tag, multiple = false) (delivery_tag) do basic_ack(delivery_tag.to_i, multiple) end end |
#automatically_recover(session, java_connection) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/march_hare/channel.rb', line 189 def automatically_recover(session, java_connection) jch = java_connection.create_channel(id) self.revive_with(jch) self.recover_shutdown_hooks self.recover_prefetch_setting self.recover_confirm_mode self.recover_confirm_hooks self.recover_tx_mode self.recover_exchanges # # this includes bindings recovery self.recover_queues self.recover_consumers self.increment_recoveries_counter end |
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
741 742 743 744 745 |
# File 'lib/march_hare/channel.rb', line 741 def basic_ack(delivery_tag, multiple) converting_rjc_exceptions_to_ruby do @delegate.basic_ack(delivery_tag.to_i, multiple) end end |
#basic_consume(queue, auto_ack, consumer) ⇒ Object
611 612 613 614 615 616 617 618 619 |
# File 'lib/march_hare/channel.rb', line 611 def basic_consume(queue, auto_ack, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do @delegate.basic_consume(queue, auto_ack, consumer) end self.register_consumer(tag, consumer) tag end |
#basic_get(queue, auto_ack) ⇒ Object
605 606 607 608 609 |
# File 'lib/march_hare/channel.rb', line 605 def basic_get(queue, auto_ack) converting_rjc_exceptions_to_ruby do @delegate.basic_get(queue, auto_ack) end end |
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
757 758 759 760 761 |
# File 'lib/march_hare/channel.rb', line 757 def basic_nack(delivery_tag, multiple = false, requeue = false) converting_rjc_exceptions_to_ruby do @delegate.basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
599 600 601 602 603 |
# File 'lib/march_hare/channel.rb', line 599 def basic_publish(exchange, routing_key, mandatory, properties, body) converting_rjc_exceptions_to_ruby do @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body) end end |
#basic_qos(prefetch_count) ⇒ Object
621 622 623 624 625 626 627 628 |
# File 'lib/march_hare/channel.rb', line 621 def basic_qos(prefetch_count) r = converting_rjc_exceptions_to_ruby do @delegate.basic_qos(prefetch_count) end @prefetch_count = prefetch_count r end |
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages
767 768 769 770 771 |
# File 'lib/march_hare/channel.rb', line 767 def basic_recover(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover(requeue) end end |
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages
777 778 779 780 781 |
# File 'lib/march_hare/channel.rb', line 777 def basic_recover_async(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover_async(requeue) end end |
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
728 729 730 731 732 |
# File 'lib/march_hare/channel.rb', line 728 def basic_reject(delivery_tag, requeue) converting_rjc_exceptions_to_ruby do @delegate.basic_reject(delivery_tag.to_i, requeue) end end |
#channel_number ⇒ Integer Also known as: id, number
Returns Channel id.
148 149 150 |
# File 'lib/march_hare/channel.rb', line 148 def channel_number @delegate.channel_number end |
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/march_hare/channel.rb', line 164 def close(code = 200, reason = "Goodbye") v = @delegate.close(code, reason) @consumers.each do |tag, consumer| consumer.gracefully_shut_down end @connection.unregister_channel(self) v end |
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
790 791 792 793 794 795 |
# File 'lib/march_hare/channel.rb', line 790 def confirm_select converting_rjc_exceptions_to_ruby do @confirm_mode = true @delegate.confirm_select end end |
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
966 967 968 969 970 971 972 |
# File 'lib/march_hare/channel.rb', line 966 def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue Exception, java.lang.Throwable => e Exceptions.convert_and_reraise(e) end end |
#default_exchange ⇒ Object
Provides access to the default exchange
402 403 404 |
# File 'lib/march_hare/channel.rb', line 402 def default_exchange @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct") end |
#deregister_exchange(exchange) ⇒ Object
936 937 938 |
# File 'lib/march_hare/channel.rb', line 936 def deregister_exchange(exchange) @exchanges.delete(exchange.name) end |
#deregister_queue(queue) ⇒ Object
916 917 918 |
# File 'lib/march_hare/channel.rb', line 916 def deregister_queue(queue) @queues.delete(queue.name) end |
#deregister_queue_named(name) ⇒ Object
921 922 923 |
# File 'lib/march_hare/channel.rb', line 921 def deregister_queue_named(name) @queues.delete(name) end |
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
350 351 352 353 354 355 356 |
# File 'lib/march_hare/channel.rb', line 350 def direct(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
308 309 310 311 312 313 314 |
# File 'lib/march_hare/channel.rb', line 308 def exchange(name, ={}) dx = Exchange.new(self, name, ).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)
432 433 434 |
# File 'lib/march_hare/channel.rb', line 432 def exchange_bind(destination, source, routing_key, arguments = nil) @delegate.exchange_bind(destination, source, routing_key, arguments) end |
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
417 418 419 |
# File 'lib/march_hare/channel.rb', line 417 def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments) end |
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)
447 448 449 |
# File 'lib/march_hare/channel.rb', line 447 def exchange_unbind(destination, source, routing_key, arguments = nil) @delegate.exchange_unbind(destination, source, routing_key, arguments) end |
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
329 330 331 332 333 334 335 |
# File 'lib/march_hare/channel.rb', line 329 def fanout(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x| x.declare! end self.register_exchange(dx) end |
#find_queue(name) ⇒ Object
931 932 933 |
# File 'lib/march_hare/channel.rb', line 931 def find_queue(name) @queues[name] end |
#gracefully_shut_down_consumers ⇒ Object
956 957 958 959 960 |
# File 'lib/march_hare/channel.rb', line 956 def gracefully_shut_down_consumers @consumers.each do |tag, consumer| consumer.gracefully_shut_down end end |
#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
975 976 977 978 979 980 981 982 983 984 985 986 987 988 |
# File 'lib/march_hare/channel.rb', line 975 def (tag, &block) case tag # if a fixnum was passed, execute unconditionally. MK. when Fixnum then block.call # versioned delivery tags should be checked to avoid # sending out stale (invalid) tags after channel was reopened # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end end |
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
392 393 394 395 396 397 398 |
# File 'lib/march_hare/channel.rb', line 392 def headers(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x| x.declare! end self.register_exchange(dx) end |
#increment_recoveries_counter ⇒ Object
286 287 288 |
# File 'lib/march_hare/channel.rb', line 286 def increment_recoveries_counter @recoveries_counter.increment_and_get end |
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.
685 686 687 688 689 |
# File 'lib/march_hare/channel.rb', line 685 def nack(delivery_tag, multiple = false, requeue = false) (delivery_tag) do basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#next_publisher_seq_no ⇒ Object
821 822 823 |
# File 'lib/march_hare/channel.rb', line 821 def next_publisher_seq_no @delegate.next_publisher_seq_no end |
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler
861 862 863 864 865 |
# File 'lib/march_hare/channel.rb', line 861 def on_confirm(&block) ch = BlockConfirmListener.from(block) self.add_confirm_listener(ch) @confirm_hooks << ch end |
#on_return(&block) ⇒ Object
Defines a returned message handler.
855 856 857 |
# File 'lib/march_hare/channel.rb', line 855 def on_return(&block) self.add_return_listener(BlockReturnListener.from(block)) end |
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.
179 180 181 182 183 184 185 186 |
# File 'lib/march_hare/channel.rb', line 179 def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @shutdown_hooks << sh @delegate.add_shutdown_listener(sh) sh end |
#open? ⇒ Boolean
Returns true if the channel is open.
155 156 157 |
# File 'lib/march_hare/channel.rb', line 155 def open? @delegate.open? end |
#prefetch ⇒ Integer
Returns Active basic.qos prefetch setting.
645 646 647 |
# File 'lib/march_hare/channel.rb', line 645 def prefetch @prefetch_count || 0 end |
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages
640 641 642 |
# File 'lib/march_hare/channel.rb', line 640 def prefetch=(n) basic_qos(n) end |
#qos(options = {}) ⇒ Object
630 631 632 |
# File 'lib/march_hare/channel.rb', line 630 def qos(={}) basic_qos(.fetch(:prefetch_count, 0)) end |
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
469 470 471 472 473 474 475 |
# File 'lib/march_hare/channel.rb', line 469 def queue(name, ={}) dq = Queue.new(self, name, ).tap do |q| q.declare! end self.register_queue(dq) end |
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
536 537 538 539 540 |
# File 'lib/march_hare/channel.rb', line 536 def queue_bind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_bind(queue, exchange, routing_key, arguments) end end |
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
492 493 494 495 496 |
# File 'lib/march_hare/channel.rb', line 492 def queue_declare(name, durable, exclusive, auto_delete, arguments = {}) converting_rjc_exceptions_to_ruby do @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments) end end |
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.
504 505 506 507 508 |
# File 'lib/march_hare/channel.rb', line 504 def queue_declare_passive(name) converting_rjc_exceptions_to_ruby do @delegate.queue_declare_passive(name) end end |
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method
519 520 521 522 523 |
# File 'lib/march_hare/channel.rb', line 519 def queue_delete(name, if_empty = false, if_unused = false) converting_rjc_exceptions_to_ruby do @delegate.queue_delete(name, if_empty, if_unused) end end |
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
565 566 567 568 569 |
# File 'lib/march_hare/channel.rb', line 565 def queue_purge(name) converting_rjc_exceptions_to_ruby do @delegate.queue_purge(name) end end |
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
553 554 555 556 557 |
# File 'lib/march_hare/channel.rb', line 553 def queue_unbind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_unbind(queue, exchange, routing_key, arguments) end end |
#recover_confirm_hooks ⇒ Object
219 220 221 222 223 |
# File 'lib/march_hare/channel.rb', line 219 def recover_confirm_hooks @confirm_hooks.each do |ch| @delegate.add_confirm_listener(ch) end end |
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.
234 235 236 |
# File 'lib/march_hare/channel.rb', line 234 def recover_confirm_mode confirm_select if @confirm_mode end |
#recover_consumers ⇒ Object
Recovers consumers. Used by the Automatic Network Failure Recovery feature.
273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/march_hare/channel.rb', line 273 def recover_consumers @consumers.values.each do |c| begin self.unregister_consumer(c.consumer_tag) c.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering consumer #{c.consumer_tag}" end end end |
#recover_exchanges ⇒ Object
Recovers exchanges. Used by the Automatic Network Failure Recovery feature.
247 248 249 250 251 252 253 254 255 256 |
# File 'lib/march_hare/channel.rb', line 247 def recover_exchanges @exchanges.values.each do |x| begin x.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering exchange #{x.name}" end end end |
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.
228 229 230 |
# File 'lib/march_hare/channel.rb', line 228 def recover_prefetch_setting basic_qos(@prefetch_count) if @prefetch_count end |
#recover_queues ⇒ Object
Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/march_hare/channel.rb', line 260 def recover_queues @queues.values.each do |q| begin q.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering queue #{q.name}" end end end |
#recover_shutdown_hooks ⇒ Object
212 213 214 215 216 |
# File 'lib/march_hare/channel.rb', line 212 def recover_shutdown_hooks @shutdown_hooks.each do |sh| @delegate.add_shutdown_listener(sh) end end |
#recover_tx_mode ⇒ Object
Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.
240 241 242 |
# File 'lib/march_hare/channel.rb', line 240 def recover_tx_mode tx_select if @tx_mode end |
#register_consumer(consumer_tag, consumer) ⇒ Object
946 947 948 |
# File 'lib/march_hare/channel.rb', line 946 def register_consumer(consumer_tag, consumer) @consumers[consumer_tag] = consumer end |
#register_exchange(exchange) ⇒ Object
941 942 943 |
# File 'lib/march_hare/channel.rb', line 941 def register_exchange(exchange) @exchanges[exchange.name] = exchange end |
#register_queue(queue) ⇒ Object
926 927 928 |
# File 'lib/march_hare/channel.rb', line 926 def register_queue(queue) @queues[queue.name] = queue end |
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
670 671 672 673 674 |
# File 'lib/march_hare/channel.rb', line 670 def reject(delivery_tag, requeue = false) (delivery_tag) do basic_reject(delivery_tag.to_i, requeue) end end |
#revive_with(java_ch) ⇒ Object
207 208 209 |
# File 'lib/march_hare/channel.rb', line 207 def revive_with(java_ch) @delegate = java_ch end |
#session ⇒ MarchHare::Session Also known as: client, connection
Returns Connection this channel is on.
141 142 143 |
# File 'lib/march_hare/channel.rb', line 141 def session @connection end |
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
371 372 373 374 375 376 377 |
# File 'lib/march_hare/channel.rb', line 371 def topic(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x| x.declare! end self.register_exchange(dx) end |
#tx_commit ⇒ Object
Commits a transaction
840 841 842 843 844 |
# File 'lib/march_hare/channel.rb', line 840 def tx_commit converting_rjc_exceptions_to_ruby do @delegate.tx_commit end end |
#tx_rollback ⇒ Object
Rolls back a transaction
847 848 849 850 851 |
# File 'lib/march_hare/channel.rb', line 847 def tx_rollback converting_rjc_exceptions_to_ruby do @delegate.tx_rollback end end |
#tx_select ⇒ Object
Enables transactions on the channel
826 827 828 829 830 831 |
# File 'lib/march_hare/channel.rb', line 826 def tx_select converting_rjc_exceptions_to_ruby do @tx_mode = true @delegate.tx_select end end |
#unregister_consumer(consumer_tag) ⇒ Object
951 952 953 |
# File 'lib/march_hare/channel.rb', line 951 def unregister_consumer(consumer_tag) @consumers.delete(consumer_tag) end |
#using_publisher_confirms? ⇒ Boolean Also known as: uses_publisher_confirms?
Returns true if publisher confirms are enabled for this channel.
798 799 800 |
# File 'lib/march_hare/channel.rb', line 798 def using_publisher_confirms? !!@confirm_mode end |
#using_tx? ⇒ Boolean Also known as: uses_tx?
Returns true if transactions are enabled for this channel.
834 835 836 |
# File 'lib/march_hare/channel.rb', line 834 def using_tx? !!@tx_mode end |
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Takes an optional timeout in milliseconds. Will raise an exception in timeout has occured.
811 812 813 814 815 816 817 818 819 |
# File 'lib/march_hare/channel.rb', line 811 def wait_for_confirms(timeout = nil) if timeout converting_rjc_exceptions_to_ruby do @delegate.wait_for_confirms(timeout) end else @delegate.wait_for_confirms end end |