Class: MarchHare::Channel
- Inherits:
-
Object
- Object
- MarchHare::Channel
- Defined in:
- lib/march_hare/channel.rb
Overview
## Channels in RabbitMQ
To quote AMQP 0.9.1 specification:
AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
## Opening Channels
Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:
This will automatically allocate a channel id.
## Closing Channels
Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.
## Higher-level API
MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).
### Queue Operations In Higher-level API
### Exchange Operations In Higher-level API
-
#topic declares a topic exchange. The rest of the API is in Exchange.
-
#direct declares a direct exchange.
-
#fanout declares a fanout exchange.
-
#headers declares a headers exchange.
-
#exchange is used to declare exchanges with type specified as a symbol or string.
## Channel Qos (Prefetch Level)
It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.
## Channel IDs
Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.
There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.
## Channels and Error Handling
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.
Defined Under Namespace
Classes: BlockConfirmListener, BlockReturnListener
Instance Attribute Summary collapse
-
#consumers ⇒ Array<MarchHare::Consumer>
readonly
Consumers on this channel.
-
#recoveries_counter ⇒ Object
readonly
Returns the value of attribute recoveries_counter.
Exchanges collapse
-
#default_exchange ⇒ Object
Provides access to the default exchange.
-
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
-
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension).
-
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
-
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension).
-
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
-
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
Queues collapse
-
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
-
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method.
-
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
-
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method.
-
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method.
-
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
-
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method.
basic.* collapse
-
#ack(delivery_tag, multiple = false) ⇒ Object
(also: #acknowledge)
Acknowledges a message.
-
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
- #basic_consume(queue, auto_ack, consumer) ⇒ Object
- #basic_get(queue, auto_ack) ⇒ Object
-
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
-
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
- #basic_qos(prefetch_count) ⇒ Object
-
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
-
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message.
-
#prefetch ⇒ Integer
Active basic.qos prefetch setting.
-
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages.
- #qos(options = {}) ⇒ Object
-
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message.
Instance Method Summary collapse
- #automatically_recover(session, java_connection) ⇒ Object
-
#channel_number ⇒ Integer
(also: #id, #number)
Channel id.
-
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
-
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
-
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
- #deregister_exchange(exchange) ⇒ Object
- #deregister_queue(queue) ⇒ Object
- #deregister_queue_named(name) ⇒ Object
- #find_queue(name) ⇒ Object
- #gracefully_shut_down_consumers ⇒ Object
- #guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
- #increment_recoveries_counter ⇒ Object
-
#initialize(session, delegate) ⇒ Channel
constructor
A new instance of Channel.
- #method_missing(selector, *args) ⇒ Object
- #next_publisher_seq_no ⇒ Object
-
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler.
-
#on_return(&block) ⇒ Object
Defines a returned message handler.
-
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback.
- #recover_confirm_hooks ⇒ Object
-
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode.
-
#recover_consumers ⇒ Object
Recovers consumers.
-
#recover_exchanges ⇒ Object
Recovers exchanges.
-
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting.
-
#recover_queues ⇒ Object
Recovers queues and bindings.
- #recover_shutdown_hooks ⇒ Object
-
#recover_tx_mode ⇒ Object
Recovers transaction mode.
- #register_consumer(consumer_tag, consumer) ⇒ Object
- #register_exchange(exchange) ⇒ Object
- #register_queue(queue) ⇒ Object
- #revive_with(java_ch) ⇒ Object
-
#session ⇒ MarchHare::Session
(also: #client, #connection)
Connection this channel is on.
-
#tx_commit ⇒ Object
Commits a transaction.
-
#tx_rollback ⇒ Object
Rolls back a transaction.
-
#tx_select ⇒ Object
Enables transactions on the channel.
- #unregister_consumer(consumer_tag) ⇒ Object
-
#using_publisher_confirms? ⇒ Boolean
(also: #uses_publisher_confirms?)
True if publisher confirms are enabled for this channel.
-
#using_tx? ⇒ Boolean
(also: #uses_tx?)
True if transactions are enabled for this channel.
-
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Constructor Details
#initialize(session, delegate) ⇒ Channel
Returns a new instance of Channel.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/march_hare/channel.rb', line 121 def initialize(session, delegate) @connection = session @delegate = delegate @exchanges = JavaConcurrent::ConcurrentHashMap.new @queues = JavaConcurrent::ConcurrentHashMap.new # we keep track of consumers in part to gracefully shut down their # executors when the channel is closed. This frees library users # from having to worry about this. MK. @consumers = JavaConcurrent::ConcurrentHashMap.new @shutdown_hooks = Array.new @confirm_hooks = Array.new @recoveries_counter = JavaConcurrent::AtomicInteger.new(0) on_shutdown do |ch, cause| ch.gracefully_shut_down_consumers end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
862 863 864 |
# File 'lib/march_hare/channel.rb', line 862 def method_missing(selector, *args) @delegate.__send__(selector, *args) end |
Instance Attribute Details
#consumers ⇒ Array<MarchHare::Consumer> (readonly)
Returns Consumers on this channel.
118 119 120 |
# File 'lib/march_hare/channel.rb', line 118 def consumers @consumers end |
#recoveries_counter ⇒ Object (readonly)
Returns the value of attribute recoveries_counter.
285 286 287 |
# File 'lib/march_hare/channel.rb', line 285 def recoveries_counter @recoveries_counter end |
Instance Method Details
#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge
Acknowledges a message. Acknowledged messages are completely removed from the queue.
650 651 652 653 654 |
# File 'lib/march_hare/channel.rb', line 650 def ack(delivery_tag, multiple = false) (delivery_tag) do basic_ack(delivery_tag.to_i, multiple) end end |
#automatically_recover(session, java_connection) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/march_hare/channel.rb', line 184 def automatically_recover(session, java_connection) jch = java_connection.create_channel(id) self.revive_with(jch) self.recover_shutdown_hooks self.recover_prefetch_setting self.recover_confirm_mode self.recover_confirm_hooks self.recover_tx_mode self.recover_exchanges # # this includes bindings recovery self.recover_queues self.recover_consumers self.increment_recoveries_counter end |
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
736 737 738 739 740 |
# File 'lib/march_hare/channel.rb', line 736 def basic_ack(delivery_tag, multiple) converting_rjc_exceptions_to_ruby do @delegate.basic_ack(delivery_tag.to_i, multiple) end end |
#basic_consume(queue, auto_ack, consumer) ⇒ Object
606 607 608 609 610 611 612 613 614 |
# File 'lib/march_hare/channel.rb', line 606 def basic_consume(queue, auto_ack, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do @delegate.basic_consume(queue, auto_ack, consumer) end self.register_consumer(tag, consumer) tag end |
#basic_get(queue, auto_ack) ⇒ Object
600 601 602 603 604 |
# File 'lib/march_hare/channel.rb', line 600 def basic_get(queue, auto_ack) converting_rjc_exceptions_to_ruby do @delegate.basic_get(queue, auto_ack) end end |
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
752 753 754 755 756 |
# File 'lib/march_hare/channel.rb', line 752 def basic_nack(delivery_tag, multiple = false, requeue = false) converting_rjc_exceptions_to_ruby do @delegate.basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
594 595 596 597 598 |
# File 'lib/march_hare/channel.rb', line 594 def basic_publish(exchange, routing_key, mandatory, properties, body) converting_rjc_exceptions_to_ruby do @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body) end end |
#basic_qos(prefetch_count) ⇒ Object
616 617 618 619 620 621 622 623 |
# File 'lib/march_hare/channel.rb', line 616 def basic_qos(prefetch_count) r = converting_rjc_exceptions_to_ruby do @delegate.basic_qos(prefetch_count) end @prefetch_count = prefetch_count r end |
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages
762 763 764 765 766 |
# File 'lib/march_hare/channel.rb', line 762 def basic_recover(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover(requeue) end end |
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages
772 773 774 775 776 |
# File 'lib/march_hare/channel.rb', line 772 def basic_recover_async(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover_async(requeue) end end |
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
723 724 725 726 727 |
# File 'lib/march_hare/channel.rb', line 723 def basic_reject(delivery_tag, requeue) converting_rjc_exceptions_to_ruby do @delegate.basic_reject(delivery_tag.to_i, requeue) end end |
#channel_number ⇒ Integer Also known as: id, number
Returns Channel id.
148 149 150 |
# File 'lib/march_hare/channel.rb', line 148 def channel_number @delegate.channel_number end |
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.
159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/march_hare/channel.rb', line 159 def close(code = 200, reason = "Goodbye") v = @delegate.close(code, reason) @consumers.each do |tag, consumer| consumer.gracefully_shut_down end @connection.unregister_channel(self) v end |
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
785 786 787 788 789 790 |
# File 'lib/march_hare/channel.rb', line 785 def confirm_select converting_rjc_exceptions_to_ruby do @confirm_mode = true @delegate.confirm_select end end |
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
961 962 963 964 965 966 967 |
# File 'lib/march_hare/channel.rb', line 961 def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue Exception, java.lang.Throwable => e Exceptions.convert_and_reraise(e) end end |
#default_exchange ⇒ Object
Provides access to the default exchange
397 398 399 |
# File 'lib/march_hare/channel.rb', line 397 def default_exchange @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct") end |
#deregister_exchange(exchange) ⇒ Object
931 932 933 |
# File 'lib/march_hare/channel.rb', line 931 def deregister_exchange(exchange) @exchanges.delete(exchange.name) end |
#deregister_queue(queue) ⇒ Object
911 912 913 |
# File 'lib/march_hare/channel.rb', line 911 def deregister_queue(queue) @queues.delete(queue.name) end |
#deregister_queue_named(name) ⇒ Object
916 917 918 |
# File 'lib/march_hare/channel.rb', line 916 def deregister_queue_named(name) @queues.delete(name) end |
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
345 346 347 348 349 350 351 |
# File 'lib/march_hare/channel.rb', line 345 def direct(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
303 304 305 306 307 308 309 |
# File 'lib/march_hare/channel.rb', line 303 def exchange(name, ={}) dx = Exchange.new(self, name, ).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)
427 428 429 |
# File 'lib/march_hare/channel.rb', line 427 def exchange_bind(destination, source, routing_key, arguments = nil) @delegate.exchange_bind(destination, source, routing_key, arguments) end |
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
412 413 414 |
# File 'lib/march_hare/channel.rb', line 412 def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments) end |
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)
442 443 444 |
# File 'lib/march_hare/channel.rb', line 442 def exchange_unbind(destination, source, routing_key, arguments = nil) @delegate.exchange_unbind(destination, source, routing_key, arguments) end |
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
324 325 326 327 328 329 330 |
# File 'lib/march_hare/channel.rb', line 324 def fanout(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x| x.declare! end self.register_exchange(dx) end |
#find_queue(name) ⇒ Object
926 927 928 |
# File 'lib/march_hare/channel.rb', line 926 def find_queue(name) @queues[name] end |
#gracefully_shut_down_consumers ⇒ Object
951 952 953 954 955 |
# File 'lib/march_hare/channel.rb', line 951 def gracefully_shut_down_consumers @consumers.each do |tag, consumer| consumer.gracefully_shut_down end end |
#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
970 971 972 973 974 975 976 977 978 979 980 981 982 983 |
# File 'lib/march_hare/channel.rb', line 970 def (tag, &block) case tag # if a fixnum was passed, execute unconditionally. MK. when Fixnum then block.call # versioned delivery tags should be checked to avoid # sending out stale (invalid) tags after channel was reopened # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end end |
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
387 388 389 390 391 392 393 |
# File 'lib/march_hare/channel.rb', line 387 def headers(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x| x.declare! end self.register_exchange(dx) end |
#increment_recoveries_counter ⇒ Object
281 282 283 |
# File 'lib/march_hare/channel.rb', line 281 def increment_recoveries_counter @recoveries_counter.increment_and_get end |
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.
680 681 682 683 684 |
# File 'lib/march_hare/channel.rb', line 680 def nack(delivery_tag, multiple = false, requeue = false) (delivery_tag) do basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#next_publisher_seq_no ⇒ Object
816 817 818 |
# File 'lib/march_hare/channel.rb', line 816 def next_publisher_seq_no @delegate.next_publisher_seq_no end |
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler
856 857 858 859 860 |
# File 'lib/march_hare/channel.rb', line 856 def on_confirm(&block) ch = BlockConfirmListener.from(block) self.add_confirm_listener(ch) @confirm_hooks << ch end |
#on_return(&block) ⇒ Object
Defines a returned message handler.
850 851 852 |
# File 'lib/march_hare/channel.rb', line 850 def on_return(&block) self.add_return_listener(BlockReturnListener.from(block)) end |
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.
174 175 176 177 178 179 180 181 |
# File 'lib/march_hare/channel.rb', line 174 def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @shutdown_hooks << sh @delegate.add_shutdown_listener(sh) sh end |
#prefetch ⇒ Integer
Returns Active basic.qos prefetch setting.
640 641 642 |
# File 'lib/march_hare/channel.rb', line 640 def prefetch @prefetch_count || 0 end |
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages
635 636 637 |
# File 'lib/march_hare/channel.rb', line 635 def prefetch=(n) basic_qos(n) end |
#qos(options = {}) ⇒ Object
625 626 627 |
# File 'lib/march_hare/channel.rb', line 625 def qos(={}) basic_qos(.fetch(:prefetch_count, 0)) end |
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
464 465 466 467 468 469 470 |
# File 'lib/march_hare/channel.rb', line 464 def queue(name, ={}) dq = Queue.new(self, name, ).tap do |q| q.declare! end self.register_queue(dq) end |
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
531 532 533 534 535 |
# File 'lib/march_hare/channel.rb', line 531 def queue_bind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_bind(queue, exchange, routing_key, arguments) end end |
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
487 488 489 490 491 |
# File 'lib/march_hare/channel.rb', line 487 def queue_declare(name, durable, exclusive, auto_delete, arguments = {}) converting_rjc_exceptions_to_ruby do @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments) end end |
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.
499 500 501 502 503 |
# File 'lib/march_hare/channel.rb', line 499 def queue_declare_passive(name) converting_rjc_exceptions_to_ruby do @delegate.queue_declare_passive(name) end end |
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method
514 515 516 517 518 |
# File 'lib/march_hare/channel.rb', line 514 def queue_delete(name, if_empty = false, if_unused = false) converting_rjc_exceptions_to_ruby do @delegate.queue_delete(name, if_empty, if_unused) end end |
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
560 561 562 563 564 |
# File 'lib/march_hare/channel.rb', line 560 def queue_purge(name) converting_rjc_exceptions_to_ruby do @delegate.queue_purge(name) end end |
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
548 549 550 551 552 |
# File 'lib/march_hare/channel.rb', line 548 def queue_unbind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_unbind(queue, exchange, routing_key, arguments) end end |
#recover_confirm_hooks ⇒ Object
214 215 216 217 218 |
# File 'lib/march_hare/channel.rb', line 214 def recover_confirm_hooks @confirm_hooks.each do |ch| @delegate.add_confirm_listener(ch) end end |
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.
229 230 231 |
# File 'lib/march_hare/channel.rb', line 229 def recover_confirm_mode confirm_select if @confirm_mode end |
#recover_consumers ⇒ Object
Recovers consumers. Used by the Automatic Network Failure Recovery feature.
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/march_hare/channel.rb', line 268 def recover_consumers @consumers.values.each do |c| begin self.unregister_consumer(c.consumer_tag) c.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering consumer #{c.consumer_tag}" end end end |
#recover_exchanges ⇒ Object
Recovers exchanges. Used by the Automatic Network Failure Recovery feature.
242 243 244 245 246 247 248 249 250 251 |
# File 'lib/march_hare/channel.rb', line 242 def recover_exchanges @exchanges.values.each do |x| begin x.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering exchange #{x.name}" end end end |
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.
223 224 225 |
# File 'lib/march_hare/channel.rb', line 223 def recover_prefetch_setting basic_qos(@prefetch_count) if @prefetch_count end |
#recover_queues ⇒ Object
Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.
255 256 257 258 259 260 261 262 263 264 |
# File 'lib/march_hare/channel.rb', line 255 def recover_queues @queues.values.each do |q| begin q.recover_from_network_failure rescue Exception => e # TODO: logger $stderr.puts "Caught exception when recovering queue #{q.name}" end end end |
#recover_shutdown_hooks ⇒ Object
207 208 209 210 211 |
# File 'lib/march_hare/channel.rb', line 207 def recover_shutdown_hooks @shutdown_hooks.each do |sh| @delegate.add_shutdown_listener(sh) end end |
#recover_tx_mode ⇒ Object
Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.
235 236 237 |
# File 'lib/march_hare/channel.rb', line 235 def recover_tx_mode tx_select if @tx_mode end |
#register_consumer(consumer_tag, consumer) ⇒ Object
941 942 943 |
# File 'lib/march_hare/channel.rb', line 941 def register_consumer(consumer_tag, consumer) @consumers[consumer_tag] = consumer end |
#register_exchange(exchange) ⇒ Object
936 937 938 |
# File 'lib/march_hare/channel.rb', line 936 def register_exchange(exchange) @exchanges[exchange.name] = exchange end |
#register_queue(queue) ⇒ Object
921 922 923 |
# File 'lib/march_hare/channel.rb', line 921 def register_queue(queue) @queues[queue.name] = queue end |
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
665 666 667 668 669 |
# File 'lib/march_hare/channel.rb', line 665 def reject(delivery_tag, requeue = false) (delivery_tag) do basic_reject(delivery_tag.to_i, requeue) end end |
#revive_with(java_ch) ⇒ Object
202 203 204 |
# File 'lib/march_hare/channel.rb', line 202 def revive_with(java_ch) @delegate = java_ch end |
#session ⇒ MarchHare::Session Also known as: client, connection
Returns Connection this channel is on.
141 142 143 |
# File 'lib/march_hare/channel.rb', line 141 def session @connection end |
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
366 367 368 369 370 371 372 |
# File 'lib/march_hare/channel.rb', line 366 def topic(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x| x.declare! end self.register_exchange(dx) end |
#tx_commit ⇒ Object
Commits a transaction
835 836 837 838 839 |
# File 'lib/march_hare/channel.rb', line 835 def tx_commit converting_rjc_exceptions_to_ruby do @delegate.tx_commit end end |
#tx_rollback ⇒ Object
Rolls back a transaction
842 843 844 845 846 |
# File 'lib/march_hare/channel.rb', line 842 def tx_rollback converting_rjc_exceptions_to_ruby do @delegate.tx_rollback end end |
#tx_select ⇒ Object
Enables transactions on the channel
821 822 823 824 825 826 |
# File 'lib/march_hare/channel.rb', line 821 def tx_select converting_rjc_exceptions_to_ruby do @tx_mode = true @delegate.tx_select end end |
#unregister_consumer(consumer_tag) ⇒ Object
946 947 948 |
# File 'lib/march_hare/channel.rb', line 946 def unregister_consumer(consumer_tag) @consumers.delete(consumer_tag) end |
#using_publisher_confirms? ⇒ Boolean Also known as: uses_publisher_confirms?
Returns true if publisher confirms are enabled for this channel.
793 794 795 |
# File 'lib/march_hare/channel.rb', line 793 def using_publisher_confirms? !!@confirm_mode end |
#using_tx? ⇒ Boolean Also known as: uses_tx?
Returns true if transactions are enabled for this channel.
829 830 831 |
# File 'lib/march_hare/channel.rb', line 829 def using_tx? !!@tx_mode end |
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Takes an optional timeout in milliseconds. Will raise an exception in timeout has occured.
806 807 808 809 810 811 812 813 814 |
# File 'lib/march_hare/channel.rb', line 806 def wait_for_confirms(timeout = nil) if timeout converting_rjc_exceptions_to_ruby do @delegate.wait_for_confirms(timeout) end else @delegate.wait_for_confirms end end |