Class: AMQ::Client::Async::Channel
- Inherits:
-
Object
- Object
- AMQ::Client::Async::Channel
- Extended by:
- ProtocolMethodHandlers, RegisterEntityMixin
- Includes:
- Entity, Extensions::RabbitMQ::Basic::ChannelMixin, Extensions::RabbitMQ::Confirm::ChannelMixin
- Defined in:
- lib/amq/client/async/extensions/rabbitmq/basic.rb,
lib/amq/client/async/channel.rb,
lib/amq/client/async/extensions/rabbitmq/confirm.rb
Overview
Extensions
Constant Summary collapse
- DEFAULT_REPLY_TEXT =
"Goodbye".freeze
- RECOVERY_EVENTS =
[:after_connection_interruption, :before_recovery, :after_recovery].freeze
Constants included from Openable
Instance Attribute Summary collapse
-
#consumers_awaiting_cancel_ok ⇒ Object
readonly
Returns the value of attribute consumers_awaiting_cancel_ok.
-
#consumers_awaiting_consume_ok ⇒ Object
readonly
Returns the value of attribute consumers_awaiting_consume_ok.
-
#exchanges_awaiting_declare_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_declare_ok.
-
#exchanges_awaiting_delete_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_delete_ok.
-
#flow_is_active ⇒ Object
Returns the value of attribute flow_is_active.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#queues_awaiting_bind_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_bind_ok.
-
#queues_awaiting_declare_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_declare_ok.
-
#queues_awaiting_delete_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_delete_ok.
-
#queues_awaiting_get_response ⇒ Object
readonly
Returns the value of attribute queues_awaiting_get_response.
-
#queues_awaiting_purge_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_purge_ok.
-
#queues_awaiting_unbind_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_unbind_ok.
Attributes included from Extensions::RabbitMQ::Confirm::ChannelMixin
Attributes included from Entity
Attributes included from Openable
Channel lifecycle collapse
-
#close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) ⇒ Object
Closes AMQP channel.
-
#open(&block) ⇒ Object
(also: #reopen)
Opens AMQP channel.
Message acknowledgements collapse
-
#acknowledge(delivery_tag, multiple = false) ⇒ Object
Acknowledge one or all messages on the channel.
-
#recover(requeue = true, &block) ⇒ Channel
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
-
#reject(delivery_tag, requeue = true) ⇒ Object
Reject a message with given delivery tag.
QoS and flow handling collapse
-
#flow(active = false, &block) ⇒ Object
Asks the peer to pause or restart the flow of content data sent to a consumer.
-
#flow_is_active? ⇒ Boolean
True if flow in this channel is active (messages will be delivered to consumers that use this channel).
-
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
Requests a specific quality of service.
Transactions collapse
-
#tx_commit(&block) ⇒ Object
Commits AMQP transaction.
-
#tx_rollback(&block) ⇒ Object
Rolls AMQP transaction back.
-
#tx_select(&block) ⇒ Object
Sets the channel to use standard transactions.
Error handling collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_error(&block) ⇒ Object
Defines a callback that will be executed when channel is closed after channel-level exception.
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed after AMQP connection has recovered after a network failure.
Instance Method Summary collapse
-
#auto_recovering? ⇒ Boolean
True if this channel uses automatic recovery mode.
-
#connection ⇒ AMQ::Client::Connection
AMQP connection this channel belongs to.
- #consumers ⇒ Hash<String, Consumer>
-
#exchanges ⇒ Array<Exchange>
Collection of exchanges that were declared on this channel.
-
#find_exchange(name) ⇒ AMQ::Client::Exchange
Finds exchange in the exchanges cache on this channel by name.
-
#initialize(connection, id, options = {}) ⇒ Channel
constructor
A new instance of Channel.
-
#queues ⇒ Array<Queue>
Collection of queues that were declared on this channel.
-
#register_exchange(exchange) ⇒ Object
Implementation.
-
#synchronize(&block) ⇒ Object
Synchronizes given block using this channel’s mutex.
Methods included from RegisterEntityMixin
Methods included from ProtocolMethodHandlers
Methods included from Extensions::RabbitMQ::Confirm::ChannelMixin
#confirm_select, #handle_basic_ack, #handle_basic_nack, #handle_select_ok, #increment_publisher_index!, #on_ack, #on_nack, #reset_publisher_index!, #uses_publisher_confirmations?
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(connection, id, options = {}) ⇒ Channel
Returns a new instance of Channel.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/amq/client/async/channel.rb', line 39 def initialize(connection, id, = {}) super(connection) @id = id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new @options = { :auto_recovery => connection.auto_recovering? }.merge() @auto_recovery = (!!@options[:auto_recovery]) # we must synchronize frameset delivery. MK. @mutex = Mutex.new reset_state! # 65536 is here for cases when channel is opened without passing a callback in, # otherwise channel_mix would be nil and it causes a lot of needless headaches. # lets just have this default. MK. channel_max = if @connection.open? @connection.channel_max || 65536 else 65536 end if channel_max != 0 && !(0..channel_max).include?(id) raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}") end end |
Instance Attribute Details
#consumers_awaiting_cancel_ok ⇒ Object (readonly)
Returns the value of attribute consumers_awaiting_cancel_ok.
34 35 36 |
# File 'lib/amq/client/async/channel.rb', line 34 def consumers_awaiting_cancel_ok @consumers_awaiting_cancel_ok end |
#consumers_awaiting_consume_ok ⇒ Object (readonly)
Returns the value of attribute consumers_awaiting_consume_ok.
34 35 36 |
# File 'lib/amq/client/async/channel.rb', line 34 def consumers_awaiting_consume_ok @consumers_awaiting_consume_ok end |
#exchanges_awaiting_declare_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_declare_ok.
32 33 34 |
# File 'lib/amq/client/async/channel.rb', line 32 def exchanges_awaiting_declare_ok @exchanges_awaiting_declare_ok end |
#exchanges_awaiting_delete_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_delete_ok.
32 33 34 |
# File 'lib/amq/client/async/channel.rb', line 32 def exchanges_awaiting_delete_ok @exchanges_awaiting_delete_ok end |
#flow_is_active ⇒ Object
Returns the value of attribute flow_is_active.
36 37 38 |
# File 'lib/amq/client/async/channel.rb', line 36 def flow_is_active @flow_is_active end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
30 31 32 |
# File 'lib/amq/client/async/channel.rb', line 30 def id @id end |
#queues_awaiting_bind_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_bind_ok.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_bind_ok @queues_awaiting_bind_ok end |
#queues_awaiting_declare_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_declare_ok.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_declare_ok @queues_awaiting_declare_ok end |
#queues_awaiting_delete_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_delete_ok.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_delete_ok @queues_awaiting_delete_ok end |
#queues_awaiting_get_response ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_get_response.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_get_response @queues_awaiting_get_response end |
#queues_awaiting_purge_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_purge_ok.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_purge_ok @queues_awaiting_purge_ok end |
#queues_awaiting_unbind_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_unbind_ok.
33 34 35 |
# File 'lib/amq/client/async/channel.rb', line 33 def queues_awaiting_unbind_ok @queues_awaiting_unbind_ok end |
Instance Method Details
#acknowledge(delivery_tag, multiple = false) ⇒ Object
Acknowledge one or all messages on the channel.
137 138 139 140 141 |
# File 'lib/amq/client/async/channel.rb', line 137 def acknowledge(delivery_tag, multiple = false) @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple)) self end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
322 323 324 325 326 327 328 329 330 |
# File 'lib/amq/client/async/channel.rb', line 322 def auto_recover return unless auto_recovering? self.open do # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end |
#auto_recovering? ⇒ Boolean
Returns true if this channel uses automatic recovery mode.
69 70 71 |
# File 'lib/amq/client/async/channel.rb', line 69 def auto_recovering? @auto_recovery end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
286 287 288 |
# File 'lib/amq/client/async/channel.rb', line 286 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) ⇒ Object
Closes AMQP channel.
121 122 123 124 125 |
# File 'lib/amq/client/async/channel.rb', line 121 def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) @connection.send_frame(Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id)) self.redefine_callback :close, &block end |
#connection ⇒ AMQ::Client::Connection
AMQP connection this channel belongs to.
93 94 95 |
# File 'lib/amq/client/async/channel.rb', line 93 def connection @connection end |
#consumers ⇒ Hash<String, Consumer>
75 76 77 |
# File 'lib/amq/client/async/channel.rb', line 75 def consumers @consumers end |
#exchanges ⇒ Array<Exchange>
Returns Collection of exchanges that were declared on this channel.
85 86 87 |
# File 'lib/amq/client/async/channel.rb', line 85 def exchanges @exchanges.values end |
#find_exchange(name) ⇒ AMQ::Client::Exchange
Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.
351 352 353 |
# File 'lib/amq/client/async/channel.rb', line 351 def find_exchange(name) @exchanges[name] end |
#flow(active = false, &block) ⇒ Object
Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.
196 197 198 199 200 201 |
# File 'lib/amq/client/async/channel.rb', line 196 def flow(active = false, &block) @connection.send_frame(Protocol::Channel::Flow.encode(@id, active)) self.redefine_callback :flow, &block self end |
#flow_is_active? ⇒ Boolean
Returns True if flow in this channel is active (messages will be delivered to consumers that use this channel).
206 207 208 |
# File 'lib/amq/client/async/channel.rb', line 206 def flow_is_active? @flow_is_active end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
266 267 268 |
# File 'lib/amq/client/async/channel.rb', line 266 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_error(&block) ⇒ Object
Defines a callback that will be executed when channel is closed after channel-level exception.
257 258 259 |
# File 'lib/amq/client/async/channel.rb', line 257 def on_error(&block) self.define_callback(:error, &block) end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).
304 305 306 |
# File 'lib/amq/client/async/channel.rb', line 304 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#open(&block) ⇒ Object Also known as: reopen
Opens AMQP channel.
109 110 111 112 113 114 115 |
# File 'lib/amq/client/async/channel.rb', line 109 def open(&block) @connection.send_frame(Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING)) @connection.channels[@id] = self self.status = :opening self.redefine_callback :open, &block end |
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
RabbitMQ as of 2.3.1 does not support prefetch_size.
Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.
179 180 181 182 183 184 |
# File 'lib/amq/client/async/channel.rb', line 179 def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) @connection.send_frame(Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global)) self.redefine_callback :qos, &block self end |
#queues ⇒ Array<Queue>
Returns Collection of queues that were declared on this channel.
80 81 82 |
# File 'lib/amq/client/async/channel.rb', line 80 def queues @queues.values end |
#recover(requeue = true, &block) ⇒ Channel
RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
161 162 163 164 165 166 |
# File 'lib/amq/client/async/channel.rb', line 161 def recover(requeue = true, &block) @connection.send_frame(Protocol::Basic::Recover.encode(@id, requeue)) self.redefine_callback :recover, &block self end |
#register_exchange(exchange) ⇒ Object
Implementation
339 340 341 342 343 |
# File 'lib/amq/client/async/channel.rb', line 339 def register_exchange(exchange) raise ArgumentError, "argument is nil!" if exchange.nil? @exchanges[exchange.name] = exchange end |
#reject(delivery_tag, requeue = true) ⇒ Object
Reject a message with given delivery tag.
147 148 149 150 151 |
# File 'lib/amq/client/async/channel.rb', line 147 def reject(delivery_tag, requeue = true) @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) self end |
#synchronize(&block) ⇒ Object
Synchronizes given block using this channel’s mutex.
99 100 101 |
# File 'lib/amq/client/async/channel.rb', line 99 def synchronize(&block) @mutex.synchronize(&block) end |
#tx_commit(&block) ⇒ Object
Commits AMQP transaction.
230 231 232 233 234 235 |
# File 'lib/amq/client/async/channel.rb', line 230 def tx_commit(&block) @connection.send_frame(Protocol::Tx::Commit.encode(@id)) self.redefine_callback :tx_commit, &block self end |
#tx_rollback(&block) ⇒ Object
Rolls AMQP transaction back.
240 241 242 243 244 245 |
# File 'lib/amq/client/async/channel.rb', line 240 def tx_rollback(&block) @connection.send_frame(Protocol::Tx::Rollback.encode(@id)) self.redefine_callback :tx_rollback, &block self end |
#tx_select(&block) ⇒ Object
Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.
220 221 222 223 224 225 |
# File 'lib/amq/client/async/channel.rb', line 220 def tx_select(&block) @connection.send_frame(Protocol::Tx::Select.encode(@id)) self.redefine_callback :tx_select, &block self end |