Class: MarchHare::Channel
- Inherits:
-
Object
- Object
- MarchHare::Channel
- Defined in:
- lib/march_hare/channel.rb
Overview
## Channels in RabbitMQ
To quote AMQP 0.9.1 specification:
AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
## Opening Channels
Channels can be opened either via ‘MarchHare::Session#create_channel` (sufficient in the majority of cases) or by instantiating `MarchHare::Channel` directly:
This will automatically allocate a channel id.
## Closing Channels
Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.
## Higher-level API
MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).
### Queue Operations In Higher-level API
### Exchange Operations In Higher-level API
-
#topic declares a topic exchange. The rest of the API is in Exchange.
-
#direct declares a direct exchange.
-
#fanout declares a fanout exchange.
-
#headers declares a headers exchange.
-
#exchange is used to declare exchanges with type specified as a symbol or string.
## Channel Qos (Prefetch Level)
It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.
## Channel IDs
Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.
There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.
## Channels and Error Handling
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.
Defined Under Namespace
Classes: BlockConfirmListener, BlockReturnListener
Instance Attribute Summary collapse
-
#consumers ⇒ Array<MarchHare::Consumer>
readonly
Consumers on this channel.
-
#recoveries_counter ⇒ Object
readonly
Returns the value of attribute recoveries_counter.
Exchanges collapse
-
#default_exchange ⇒ Object
Provides access to the default exchange.
-
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
-
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension).
-
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
-
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension).
-
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
-
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
-
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
Queues collapse
-
#durable_queue(name, type = Queue::Types::CLASSIC, opts = {}) ⇒ MarchHare::Queue
Declares a new server-named queue that is automatically deleted when the connection is closed.
-
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
-
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method.
-
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
-
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method.
-
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method.
-
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
-
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method.
-
#quorum_queue(name, opts = {}) ⇒ MarchHare::Queue
Declares a new client-named quorum queue.
-
#stream(name, opts = {}) ⇒ MarchHare::Queue
Declares a new client-named stream (that Bunny can use as if it was a queue).
-
#temporary_queue(opts = {}) ⇒ MarchHare::Queue
Declares a new server-named queue that is automatically deleted when the connection is closed.
basic.* collapse
-
#ack(delivery_tag, multiple = false) ⇒ Object
(also: #acknowledge)
Acknowledges a message.
-
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
- #basic_cancel(consumer_tag) ⇒ Object
- #basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
- #basic_get(queue, auto_ack) ⇒ Object
-
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
-
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
- #basic_qos(prefetch_count) ⇒ Object
-
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages.
-
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
-
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message.
-
#prefetch ⇒ Integer
Active basic.qos prefetch setting.
-
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages.
- #qos(options = {}) ⇒ Object
-
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message.
Instance Method Summary collapse
- #automatically_recover(session, java_connection) ⇒ Object
- #cancel_consumers_before_closing! ⇒ Object
- #cancel_consumers_before_closing? ⇒ Boolean
-
#channel_number ⇒ Integer
(also: #id, #number)
Channel id.
-
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
- #closed? ⇒ Boolean
- #configure(&block) ⇒ Object
-
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
-
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
- #deregister_exchange(exchange) ⇒ Object
- #deregister_queue(queue) ⇒ Object
- #deregister_queue_named(name) ⇒ Object
- #find_queue(name) ⇒ Object
- #gracefully_shut_down_consumers ⇒ Object
- #guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
- #increment_recoveries_counter ⇒ Object
-
#initialize(session, delegate) ⇒ Channel
constructor
A new instance of Channel.
-
#logger ⇒ ::Logger
Logger instance from the connection.
- #method_missing(selector, *args) ⇒ Object
- #next_publisher_seq_no ⇒ Object
-
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler.
-
#on_return(&block) ⇒ Object
Defines a returned message handler.
-
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback.
-
#open? ⇒ Boolean
True if the channel is open.
- #recover_confirm_hooks ⇒ Object
-
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode.
-
#recover_consumers ⇒ Object
Recovers consumers.
-
#recover_exchanges ⇒ Object
Recovers exchanges.
-
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting.
-
#recover_queues ⇒ Object
Recovers queues and bindings.
- #recover_shutdown_hooks ⇒ Object
-
#recover_tx_mode ⇒ Object
Recovers transaction mode.
- #register_consumer(consumer_tag, consumer) ⇒ Object
- #register_exchange(exchange) ⇒ Object
- #register_queue(queue) ⇒ Object
- #revive_with(java_ch) ⇒ Object
-
#session ⇒ MarchHare::Session
(also: #client, #connection)
Connection this channel is on.
-
#tx_commit ⇒ Object
Commits a transaction.
-
#tx_rollback ⇒ Object
Rolls back a transaction.
-
#tx_select ⇒ Object
Enables transactions on the channel.
- #unregister_consumer(consumer_tag) ⇒ Object
-
#using_publisher_confirms? ⇒ Boolean
(also: #uses_publisher_confirms?)
True if publisher confirms are enabled for this channel.
-
#using_tx? ⇒ Boolean
(also: #uses_tx?)
True if transactions are enabled for this channel.
-
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Constructor Details
#initialize(session, delegate) ⇒ Channel
Returns a new instance of Channel.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/march_hare/channel.rb', line 121 def initialize(session, delegate) @connection = session @delegate = delegate @exchanges = JavaConcurrent::ConcurrentHashMap.new @queues = JavaConcurrent::ConcurrentHashMap.new # we keep track of consumers in part to gracefully shut down their # executors when the channel is closed. This frees library users # from having to worry about this. MK. @consumers = JavaConcurrent::ConcurrentHashMap.new @shutdown_hooks = Array.new @confirm_hooks = Array.new @recoveries_counter = JavaConcurrent::AtomicInteger.new(0) # An opt-in setting that instructs the channel to cancel all consumers # before closing. This helps reduce the probability of in-flight deliveries # right before channel closure. @cancel_consumers_before_closing = false on_shutdown do |ch, cause| ch.gracefully_shut_down_consumers end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
1014 1015 1016 |
# File 'lib/march_hare/channel.rb', line 1014 def method_missing(selector, *args) @delegate.__send__(selector, *args) end |
Instance Attribute Details
#consumers ⇒ Array<MarchHare::Consumer> (readonly)
Returns Consumers on this channel.
118 119 120 |
# File 'lib/march_hare/channel.rb', line 118 def consumers @consumers end |
#recoveries_counter ⇒ Object (readonly)
Returns the value of attribute recoveries_counter.
341 342 343 |
# File 'lib/march_hare/channel.rb', line 341 def recoveries_counter @recoveries_counter end |
Instance Method Details
#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge
Acknowledges a message. Acknowledged messages are completely removed from the queue.
802 803 804 805 806 |
# File 'lib/march_hare/channel.rb', line 802 def ack(delivery_tag, multiple = false) (delivery_tag) do basic_ack(delivery_tag.to_i, multiple) end end |
#automatically_recover(session, java_connection) ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/march_hare/channel.rb', line 235 def automatically_recover(session, java_connection) logger.debug("channel: begin automatic connection recovery") jch = java_connection.create_channel(id) self.revive_with(jch) self.recover_shutdown_hooks self.recover_prefetch_setting self.recover_confirm_mode self.recover_confirm_hooks self.recover_tx_mode self.recover_exchanges # # this includes bindings recovery self.recover_queues self.recover_consumers self.increment_recoveries_counter end |
#basic_ack(delivery_tag, multiple) ⇒ NilClass
Acknowledges one or more messages (deliveries).
888 889 890 891 892 |
# File 'lib/march_hare/channel.rb', line 888 def basic_ack(delivery_tag, multiple) converting_rjc_exceptions_to_ruby do @delegate.basic_ack(delivery_tag.to_i, multiple) end end |
#basic_cancel(consumer_tag) ⇒ Object
761 762 763 764 765 766 |
# File 'lib/march_hare/channel.rb', line 761 def basic_cancel(consumer_tag) converting_rjc_exceptions_to_ruby do @delegate.basic_cancel(consumer_tag) end self.unregister_consumer(consumer_tag) end |
#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object
747 748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'lib/march_hare/channel.rb', line 747 def basic_consume(queue, auto_ack, consumer_tag = nil, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do if consumer_tag @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer) else @delegate.basic_consume(queue, auto_ack, consumer) end end self.register_consumer(tag, consumer) tag end |
#basic_get(queue, auto_ack) ⇒ Object
741 742 743 744 745 |
# File 'lib/march_hare/channel.rb', line 741 def basic_get(queue, auto_ack) converting_rjc_exceptions_to_ruby do @delegate.basic_get(queue, auto_ack) end end |
#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass
Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.
904 905 906 907 908 |
# File 'lib/march_hare/channel.rb', line 904 def basic_nack(delivery_tag, multiple = false, requeue = false) converting_rjc_exceptions_to_ruby do @delegate.basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel
Publishes a message using basic.publish AMQP 0.9.1 method.
735 736 737 738 739 |
# File 'lib/march_hare/channel.rb', line 735 def basic_publish(exchange, routing_key, mandatory, properties, body) converting_rjc_exceptions_to_ruby do @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body) end end |
#basic_qos(prefetch_count) ⇒ Object
768 769 770 771 772 773 774 775 |
# File 'lib/march_hare/channel.rb', line 768 def basic_qos(prefetch_count) r = converting_rjc_exceptions_to_ruby do @delegate.basic_qos(prefetch_count) end @prefetch_count = prefetch_count r end |
#basic_recover(requeue = true) ⇒ Object
Redeliver unacknowledged messages
914 915 916 917 918 |
# File 'lib/march_hare/channel.rb', line 914 def basic_recover(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover(requeue) end end |
#basic_recover_async(requeue = true) ⇒ Object
Redeliver unacknowledged messages
924 925 926 927 928 |
# File 'lib/march_hare/channel.rb', line 924 def basic_recover_async(requeue = true) converting_rjc_exceptions_to_ruby do @delegate.basic_recover_async(requeue) end end |
#basic_reject(delivery_tag, requeue) ⇒ NilClass
Rejects or requeues a message.
875 876 877 878 879 |
# File 'lib/march_hare/channel.rb', line 875 def basic_reject(delivery_tag, requeue) converting_rjc_exceptions_to_ruby do @delegate.basic_reject(delivery_tag.to_i, requeue) end end |
#cancel_consumers_before_closing! ⇒ Object
214 215 216 |
# File 'lib/march_hare/channel.rb', line 214 def cancel_consumers_before_closing! @cancel_consumers_before_closing = true end |
#cancel_consumers_before_closing? ⇒ Boolean
218 219 220 |
# File 'lib/march_hare/channel.rb', line 218 def cancel_consumers_before_closing? !!@cancel_consumers_before_closing end |
#channel_number ⇒ Integer Also known as: id, number
Returns Channel id.
158 159 160 |
# File 'lib/march_hare/channel.rb', line 158 def channel_number @delegate.channel_number end |
#close(code = 200, reason = "Goodbye") ⇒ Object
Closes the channel.
Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/march_hare/channel.rb', line 178 def close(code = 200, reason = "Goodbye") # This is a best-effort attempt to cancel all consumers before closing the channel. # Retries are extremely unlikely to succeed, and the channel itself is about to be closed, # so we don't bother retrying. if self.cancel_consumers_before_closing? # cancelling a consumer involves using the same mutex, so avoid holding the lock keys = @consumers.keys keys.each do |ctag| begin self.basic_cancel(ctag) rescue Bunny::Exception # ignore rescue Bunny::ClientTimeout # ignore end end end v = @delegate.close(code, reason) @consumers.each do |tag, consumer| consumer.gracefully_shut_down end @connection.unregister_channel(self) v end |
#closed? ⇒ Boolean
169 170 171 |
# File 'lib/march_hare/channel.rb', line 169 def closed? !open? end |
#configure(&block) ⇒ Object
208 209 210 211 212 |
# File 'lib/march_hare/channel.rb', line 208 def configure(&block) block.call(self) if block_given? self end |
#confirm_select ⇒ NilClass
Enables publisher confirms on the channel.
937 938 939 940 941 942 |
# File 'lib/march_hare/channel.rb', line 937 def confirm_select converting_rjc_exceptions_to_ruby do @confirm_mode = true @delegate.confirm_select end end |
#converting_rjc_exceptions_to_ruby(&block) ⇒ Object
Executes a block, catching Java exceptions RabbitMQ Java client throws and transforms them to Ruby exceptions that are then re-raised.
1119 1120 1121 1122 1123 1124 1125 |
# File 'lib/march_hare/channel.rb', line 1119 def converting_rjc_exceptions_to_ruby(&block) begin block.call rescue Exception, java.lang.Throwable => e Exceptions.convert_and_reraise(e) end end |
#default_exchange ⇒ Object
Provides access to the default exchange
453 454 455 |
# File 'lib/march_hare/channel.rb', line 453 def default_exchange @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct") end |
#deregister_exchange(exchange) ⇒ Object
1085 1086 1087 1088 |
# File 'lib/march_hare/channel.rb', line 1085 def deregister_exchange(exchange) logger.debug("channel: deregister exchange #{exchange.name}") @exchanges.delete(exchange.name) end |
#deregister_queue(queue) ⇒ Object
1063 1064 1065 1066 |
# File 'lib/march_hare/channel.rb', line 1063 def deregister_queue(queue) logger.debug("channel: deregister queue #{queue.name}") @queues.delete(queue.name) end |
#deregister_queue_named(name) ⇒ Object
1069 1070 1071 |
# File 'lib/march_hare/channel.rb', line 1069 def deregister_queue_named(name) @queues.delete(name) end |
#direct(name, opts = {}) ⇒ MarchHare::Exchange
Declares a direct exchange or looks it up in the cache of previously declared exchanges.
401 402 403 404 405 406 407 |
# File 'lib/march_hare/channel.rb', line 401 def direct(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x| x.declare! end self.register_exchange(dx) end |
#durable_queue(name, type = Queue::Types::CLASSIC, opts = {}) ⇒ MarchHare::Queue
Declares a new server-named queue that is automatically deleted when the connection is closed.
585 586 587 588 589 590 591 592 593 594 595 596 597 598 |
# File 'lib/march_hare/channel.rb', line 585 def durable_queue(name, type = Queue::Types::CLASSIC, opts = {}) throw ArgumentError.new("queue name must not be nil") if name.nil? throw ArgumentError.new("queue name must not be empty (server-named durable queues do not make sense)") if name.empty? final_opts = opts.merge({ :type => type, :durable => true, # exclusive or auto-delete QQs do not make much sense :exclusive => false, :auto_delete => false }) self.queue(name, final_opts) end |
#exchange(name, options = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
359 360 361 362 363 364 365 |
# File 'lib/march_hare/channel.rb', line 359 def exchange(name, ={}) dx = Exchange.new(self, name, ).tap do |x| x.declare! end self.register_exchange(dx) end |
#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object
Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)
485 486 487 488 489 |
# File 'lib/march_hare/channel.rb', line 485 def exchange_bind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_bind(destination, source, routing_key, arguments) end end |
#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object
Declares a echange using echange.declare AMQP 0.9.1 method.
468 469 470 471 472 |
# File 'lib/march_hare/channel.rb', line 468 def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments) end end |
#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object
Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)
502 503 504 505 506 |
# File 'lib/march_hare/channel.rb', line 502 def exchange_unbind(destination, source, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.exchange_unbind(destination, source, routing_key, arguments) end end |
#fanout(name, opts = {}) ⇒ MarchHare::Exchange
Declares a fanout exchange or looks it up in the cache of previously declared exchanges.
380 381 382 383 384 385 386 |
# File 'lib/march_hare/channel.rb', line 380 def fanout(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x| x.declare! end self.register_exchange(dx) end |
#find_queue(name) ⇒ Object
1080 1081 1082 |
# File 'lib/march_hare/channel.rb', line 1080 def find_queue(name) @queues[name] end |
#gracefully_shut_down_consumers ⇒ Object
1109 1110 1111 1112 1113 |
# File 'lib/march_hare/channel.rb', line 1109 def gracefully_shut_down_consumers @consumers.each do |tag, consumer| consumer.gracefully_shut_down end end |
#guarding_against_stale_delivery_tags(tag, &block) ⇒ Object
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 |
# File 'lib/march_hare/channel.rb', line 1128 def (tag, &block) case tag # if a fixnum was passed, execute unconditionally. MK. when Integer then block.call # versioned delivery tags should be checked to avoid # sending out stale (invalid) tags after channel was reopened # during network failure recovery. MK. when VersionedDeliveryTag then if !tag.stale?(@recoveries_counter.get) block.call end end end |
#headers(name, opts = {}) ⇒ MarchHare::Exchange
Declares a headers exchange or looks it up in the cache of previously declared exchanges.
443 444 445 446 447 448 449 |
# File 'lib/march_hare/channel.rb', line 443 def headers(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x| x.declare! end self.register_exchange(dx) end |
#increment_recoveries_counter ⇒ Object
337 338 339 |
# File 'lib/march_hare/channel.rb', line 337 def increment_recoveries_counter @recoveries_counter.increment_and_get end |
#logger ⇒ ::Logger
Returns Logger instance from the connection.
153 154 155 |
# File 'lib/march_hare/channel.rb', line 153 def logger @connection.logger end |
#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.
832 833 834 835 836 |
# File 'lib/march_hare/channel.rb', line 832 def nack(delivery_tag, multiple = false, requeue = false) (delivery_tag) do basic_nack(delivery_tag.to_i, multiple, requeue) end end |
#next_publisher_seq_no ⇒ Object
968 969 970 |
# File 'lib/march_hare/channel.rb', line 968 def next_publisher_seq_no @delegate.next_publisher_seq_no end |
#on_confirm(&block) ⇒ Object
Defines a publisher confirm handler
1008 1009 1010 1011 1012 |
# File 'lib/march_hare/channel.rb', line 1008 def on_confirm(&block) ch = BlockConfirmListener.from(block) self.add_confirm_listener(ch) @confirm_hooks << ch end |
#on_return(&block) ⇒ Object
Defines a returned message handler.
1002 1003 1004 |
# File 'lib/march_hare/channel.rb', line 1002 def on_return(&block) self.add_return_listener(BlockReturnListener.from(block)) end |
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.
225 226 227 228 229 230 231 232 |
# File 'lib/march_hare/channel.rb', line 225 def on_shutdown(&block) sh = ShutdownListener.new(self, &block) @shutdown_hooks << sh @delegate.add_shutdown_listener(sh) sh end |
#open? ⇒ Boolean
Returns true if the channel is open.
165 166 167 |
# File 'lib/march_hare/channel.rb', line 165 def open? @delegate.open? end |
#prefetch ⇒ Integer
Returns Active basic.qos prefetch setting.
792 793 794 |
# File 'lib/march_hare/channel.rb', line 792 def prefetch @prefetch_count || 0 end |
#prefetch=(n) ⇒ Object
Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages
787 788 789 |
# File 'lib/march_hare/channel.rb', line 787 def prefetch=(n) basic_qos(n) end |
#qos(options = {}) ⇒ Object
777 778 779 |
# File 'lib/march_hare/channel.rb', line 777 def qos(={}) basic_qos(.fetch(:prefetch_count, 0)) end |
#queue(name, options = {}) ⇒ MarchHare::Queue
Declares a queue or looks it up in the per-channel cache.
526 527 528 529 530 531 532 |
# File 'lib/march_hare/channel.rb', line 526 def queue(name, = {}) dq = Queue.new(self, name, ).tap do |q| q.declare! end self.register_queue(dq) end |
#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
672 673 674 675 676 |
# File 'lib/march_hare/channel.rb', line 672 def queue_bind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_bind(queue, exchange, routing_key, arguments) end end |
#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object
Declares a queue using queue.declare AMQP 0.9.1 method.
628 629 630 631 632 |
# File 'lib/march_hare/channel.rb', line 628 def queue_declare(name, durable, exclusive, auto_delete, arguments = {}) converting_rjc_exceptions_to_ruby do @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments) end end |
#queue_declare_passive(name) ⇒ Object
Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.
640 641 642 643 644 |
# File 'lib/march_hare/channel.rb', line 640 def queue_declare_passive(name) converting_rjc_exceptions_to_ruby do @delegate.queue_declare_passive(name) end end |
#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object
Deletes a queue using queue.delete AMQP 0.9.1 method
655 656 657 658 659 |
# File 'lib/march_hare/channel.rb', line 655 def queue_delete(name, if_empty = false, if_unused = false) converting_rjc_exceptions_to_ruby do @delegate.queue_delete(name, if_empty, if_unused) end end |
#queue_purge(name) ⇒ Object
Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
701 702 703 704 705 |
# File 'lib/march_hare/channel.rb', line 701 def queue_purge(name) converting_rjc_exceptions_to_ruby do @delegate.queue_purge(name) end end |
#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object
Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
689 690 691 692 693 |
# File 'lib/march_hare/channel.rb', line 689 def queue_unbind(queue, exchange, routing_key, arguments = nil) converting_rjc_exceptions_to_ruby do @delegate.queue_unbind(queue, exchange, routing_key, arguments) end end |
#quorum_queue(name, opts = {}) ⇒ MarchHare::Queue
Declares a new client-named quorum queue.
545 546 547 548 549 550 |
# File 'lib/march_hare/channel.rb', line 545 def quorum_queue(name, opts = {}) throw ArgumentError.new("quorum queue name must not be nil") if name.nil? throw ArgumentError.new("quorum queue name must not be empty (server-named QQs do not make sense)") if name.empty? durable_queue(name, Queue::Types::QUORUM, opts) end |
#recover_confirm_hooks ⇒ Object
267 268 269 270 271 |
# File 'lib/march_hare/channel.rb', line 267 def recover_confirm_hooks @confirm_hooks.each do |ch| @delegate.add_confirm_listener(ch) end end |
#recover_confirm_mode ⇒ Object
Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.
282 283 284 |
# File 'lib/march_hare/channel.rb', line 282 def recover_confirm_mode confirm_select if defined?(@confirm_mode) && @confirm_mode end |
#recover_consumers ⇒ Object
Recovers consumers. Used by the Automatic Network Failure Recovery feature.
323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/march_hare/channel.rb', line 323 def recover_consumers @consumers.values.each do |c| begin logger.debug("channel: recover consumer #{c.consumer_tag}") self.unregister_consumer(c.consumer_tag) c.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering consumer #{c.consumer_tag}") logger.error(e) end end end |
#recover_exchanges ⇒ Object
Recovers exchanges. Used by the Automatic Network Failure Recovery feature.
295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/march_hare/channel.rb', line 295 def recover_exchanges @exchanges.values.each do |x| begin logger.debug("channel: recover exchange #{x.name}") x.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering exchange #{x.name}") logger.error(e) end end end |
#recover_prefetch_setting ⇒ Object
Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.
276 277 278 |
# File 'lib/march_hare/channel.rb', line 276 def recover_prefetch_setting basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count end |
#recover_queues ⇒ Object
Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.
309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/march_hare/channel.rb', line 309 def recover_queues @queues.values.each do |q| begin logger.debug("channel: recover queue #{q.name}") q.recover_from_network_failure rescue Exception => e logger.error("Caught exception when recovering queue #{q.name}") logger.error(e) end end end |
#recover_shutdown_hooks ⇒ Object
260 261 262 263 264 |
# File 'lib/march_hare/channel.rb', line 260 def recover_shutdown_hooks @shutdown_hooks.each do |sh| @delegate.add_shutdown_listener(sh) end end |
#recover_tx_mode ⇒ Object
Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.
288 289 290 |
# File 'lib/march_hare/channel.rb', line 288 def recover_tx_mode tx_select if defined?(@tx_mode) && @tx_mode end |
#register_consumer(consumer_tag, consumer) ⇒ Object
1097 1098 1099 1100 |
# File 'lib/march_hare/channel.rb', line 1097 def register_consumer(consumer_tag, consumer) logger.debug("channel: register consumer #{consumer_tag}") @consumers[consumer_tag] = consumer end |
#register_exchange(exchange) ⇒ Object
1091 1092 1093 1094 |
# File 'lib/march_hare/channel.rb', line 1091 def register_exchange(exchange) logger.debug("channel: register exchange #{exchange.name}") @exchanges[exchange.name] = exchange end |
#register_queue(queue) ⇒ Object
1074 1075 1076 1077 |
# File 'lib/march_hare/channel.rb', line 1074 def register_queue(queue) logger.debug("channel: register queue #{queue.name}") @queues[queue.name] = queue end |
#reject(delivery_tag, requeue = false) ⇒ Object
Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.
817 818 819 820 821 |
# File 'lib/march_hare/channel.rb', line 817 def reject(delivery_tag, requeue = false) (delivery_tag) do basic_reject(delivery_tag.to_i, requeue) end end |
#revive_with(java_ch) ⇒ Object
255 256 257 |
# File 'lib/march_hare/channel.rb', line 255 def revive_with(java_ch) @delegate = java_ch end |
#session ⇒ MarchHare::Session Also known as: client, connection
Returns Connection this channel is on.
146 147 148 |
# File 'lib/march_hare/channel.rb', line 146 def session @connection end |
#stream(name, opts = {}) ⇒ MarchHare::Queue
Declares a new client-named stream (that Bunny can use as if it was a queue). Note that Bunny would still use AMQP 0-9-1 to perform operations on this “queue”. To use stream-specific operations and to gain from stream protocol efficiency and partitioning, use a Ruby client for the RabbitMQ stream protocol.
567 568 569 570 571 572 |
# File 'lib/march_hare/channel.rb', line 567 def stream(name, opts = {}) throw ArgumentError.new("stream name must not be nil") if name.nil? throw ArgumentError.new("stream name must not be empty (server-named QQs do not make sense)") if name.empty? durable_queue(name, Queue::Types::STREAM, opts) end |
#temporary_queue(opts = {}) ⇒ MarchHare::Queue
Declares a new server-named queue that is automatically deleted when the connection is closed.
606 607 608 609 610 611 |
# File 'lib/march_hare/channel.rb', line 606 def temporary_queue(opts = {}) temporary_queue_opts = { :exclusive => true } queue("", opts.merge(temporary_queue_opts)) end |
#topic(name, opts = {}) ⇒ MarchHare::Exchange
Declares a topic exchange or looks it up in the cache of previously declared exchanges.
422 423 424 425 426 427 428 |
# File 'lib/march_hare/channel.rb', line 422 def topic(name, opts = {}) dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x| x.declare! end self.register_exchange(dx) end |
#tx_commit ⇒ Object
Commits a transaction
987 988 989 990 991 |
# File 'lib/march_hare/channel.rb', line 987 def tx_commit converting_rjc_exceptions_to_ruby do @delegate.tx_commit end end |
#tx_rollback ⇒ Object
Rolls back a transaction
994 995 996 997 998 |
# File 'lib/march_hare/channel.rb', line 994 def tx_rollback converting_rjc_exceptions_to_ruby do @delegate.tx_rollback end end |
#tx_select ⇒ Object
Enables transactions on the channel
973 974 975 976 977 978 |
# File 'lib/march_hare/channel.rb', line 973 def tx_select converting_rjc_exceptions_to_ruby do @tx_mode = true @delegate.tx_select end end |
#unregister_consumer(consumer_tag) ⇒ Object
1103 1104 1105 1106 |
# File 'lib/march_hare/channel.rb', line 1103 def unregister_consumer(consumer_tag) logger.debug("channel: unregister consumer #{consumer_tag}") @consumers.delete(consumer_tag) end |
#using_publisher_confirms? ⇒ Boolean Also known as: uses_publisher_confirms?
Returns true if publisher confirms are enabled for this channel.
945 946 947 |
# File 'lib/march_hare/channel.rb', line 945 def using_publisher_confirms? !!@confirm_mode end |
#using_tx? ⇒ Boolean Also known as: uses_tx?
Returns true if transactions are enabled for this channel.
981 982 983 |
# File 'lib/march_hare/channel.rb', line 981 def using_tx? !!@tx_mode end |
#wait_for_confirms(timeout = nil) ⇒ Boolean
Waits until all outstanding publisher confirms arrive.
Takes an optional timeout in milliseconds. Will raise an exception in case a timeout has occured.
958 959 960 961 962 963 964 965 966 |
# File 'lib/march_hare/channel.rb', line 958 def wait_for_confirms(timeout = nil) if timeout converting_rjc_exceptions_to_ruby do @delegate.wait_for_confirms(timeout) end else @delegate.wait_for_confirms end end |