Class: AMQ::Client::Async::Channel

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers, RegisterEntityMixin
Includes:
Entity, Extensions::RabbitMQ::Basic::ChannelMixin, Extensions::RabbitMQ::Confirm::ChannelMixin
Defined in:
lib/amq/client/async/extensions/rabbitmq/basic.rb,
lib/amq/client/async/channel.rb,
lib/amq/client/async/extensions/rabbitmq/confirm.rb

Overview

Extensions

Constant Summary collapse

DEFAULT_REPLY_TEXT =
"Goodbye".freeze
RECOVERY_EVENTS =
[:after_connection_interruption, :before_recovery, :after_recovery].freeze

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Extensions::RabbitMQ::Confirm::ChannelMixin

#publisher_index

Attributes included from Entity

#callbacks

Attributes included from Openable

#status

Channel lifecycle collapse

Message acknowledgements collapse

QoS and flow handling collapse

Transactions collapse

Error handling collapse

Instance Method Summary collapse

Methods included from RegisterEntityMixin

register_entity

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Extensions::RabbitMQ::Confirm::ChannelMixin

#confirm_select, #handle_basic_ack, #handle_basic_nack, #handle_select_ok, #increment_publisher_index!, #on_ack, #on_nack, #reset_publisher_index!, #uses_publisher_confirmations?

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(connection, id, options = {}) ⇒ Channel

Returns a new instance of Channel.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/amq/client/async/channel.rb', line 39

def initialize(connection, id, options = {})
  super(connection)

  @id        = id
  @exchanges = Hash.new
  @queues    = Hash.new
  @consumers = Hash.new
  @options       = { :auto_recovery => connection.auto_recovering? }.merge(options)
  @auto_recovery = (!!@options[:auto_recovery])

  # we must synchronize frameset delivery. MK.
  @mutex     = Mutex.new

  reset_state!

  # 65536 is here for cases when channel is opened without passing a callback in,
  # otherwise channel_mix would be nil and it causes a lot of needless headaches.
  # lets just have this default. MK.
  channel_max = if @connection.open?
                  @connection.channel_max || 65536
                else
                  65536
                end

  if channel_max != 0 && !(0..channel_max).include?(id)
    raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}")
  end
end

Instance Attribute Details

#consumers_awaiting_cancel_okObject (readonly)

Returns the value of attribute consumers_awaiting_cancel_ok.



34
35
36
# File 'lib/amq/client/async/channel.rb', line 34

def consumers_awaiting_cancel_ok
  @consumers_awaiting_cancel_ok
end

#consumers_awaiting_consume_okObject (readonly)

Returns the value of attribute consumers_awaiting_consume_ok.



34
35
36
# File 'lib/amq/client/async/channel.rb', line 34

def consumers_awaiting_consume_ok
  @consumers_awaiting_consume_ok
end

#exchanges_awaiting_declare_okObject (readonly)

Returns the value of attribute exchanges_awaiting_declare_ok.



32
33
34
# File 'lib/amq/client/async/channel.rb', line 32

def exchanges_awaiting_declare_ok
  @exchanges_awaiting_declare_ok
end

#exchanges_awaiting_delete_okObject (readonly)

Returns the value of attribute exchanges_awaiting_delete_ok.



32
33
34
# File 'lib/amq/client/async/channel.rb', line 32

def exchanges_awaiting_delete_ok
  @exchanges_awaiting_delete_ok
end

#flow_is_activeObject

Returns the value of attribute flow_is_active.



36
37
38
# File 'lib/amq/client/async/channel.rb', line 36

def flow_is_active
  @flow_is_active
end

#idObject (readonly)

Returns the value of attribute id.



30
31
32
# File 'lib/amq/client/async/channel.rb', line 30

def id
  @id
end

#queues_awaiting_bind_okObject (readonly)

Returns the value of attribute queues_awaiting_bind_ok.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_bind_ok
  @queues_awaiting_bind_ok
end

#queues_awaiting_declare_okObject (readonly)

Returns the value of attribute queues_awaiting_declare_ok.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_declare_ok
  @queues_awaiting_declare_ok
end

#queues_awaiting_delete_okObject (readonly)

Returns the value of attribute queues_awaiting_delete_ok.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_delete_ok
  @queues_awaiting_delete_ok
end

#queues_awaiting_get_responseObject (readonly)

Returns the value of attribute queues_awaiting_get_response.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_get_response
  @queues_awaiting_get_response
end

#queues_awaiting_purge_okObject (readonly)

Returns the value of attribute queues_awaiting_purge_ok.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_purge_ok
  @queues_awaiting_purge_ok
end

#queues_awaiting_unbind_okObject (readonly)

Returns the value of attribute queues_awaiting_unbind_ok.



33
34
35
# File 'lib/amq/client/async/channel.rb', line 33

def queues_awaiting_unbind_ok
  @queues_awaiting_unbind_ok
end

Instance Method Details

#acknowledge(delivery_tag, multiple = false) ⇒ Object

Acknowledge one or all messages on the channel.

See Also:

  • AMQP 0.9.1 protocol reference (Section 1.8.3.13.)


137
138
139
140
141
# File 'lib/amq/client/async/channel.rb', line 137

def acknowledge(delivery_tag, multiple = false)
  @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))

  self
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



322
323
324
325
326
327
328
329
330
# File 'lib/amq/client/async/channel.rb', line 322

def auto_recover
  return unless auto_recovering?

  self.open do
    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end

#auto_recovering?Boolean

Returns true if this channel uses automatic recovery mode.

Returns:

  • (Boolean)

    true if this channel uses automatic recovery mode



69
70
71
# File 'lib/amq/client/async/channel.rb', line 69

def auto_recovering?
  @auto_recovery
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



286
287
288
# File 'lib/amq/client/async/channel.rb', line 286

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) ⇒ Object

Closes AMQP channel.



121
122
123
124
125
# File 'lib/amq/client/async/channel.rb', line 121

def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  @connection.send_frame(Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))

  self.redefine_callback :close, &block
end

#connectionAMQ::Client::Connection

AMQP connection this channel belongs to.

Returns:

  • (AMQ::Client::Connection)

    Connection this channel belongs to.



93
94
95
# File 'lib/amq/client/async/channel.rb', line 93

def connection
  @connection
end

#consumersHash<String, Consumer>

Returns:



75
76
77
# File 'lib/amq/client/async/channel.rb', line 75

def consumers
  @consumers
end

#exchangesArray<Exchange>

Returns Collection of exchanges that were declared on this channel.

Returns:

  • (Array<Exchange>)

    Collection of exchanges that were declared on this channel.



85
86
87
# File 'lib/amq/client/async/channel.rb', line 85

def exchanges
  @exchanges.values
end

#find_exchange(name) ⇒ AMQ::Client::Exchange

Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.

Parameters:

  • name (String)

    Exchange name

Returns:

  • (AMQ::Client::Exchange)

    Exchange (if found)



351
352
353
# File 'lib/amq/client/async/channel.rb', line 351

def find_exchange(name)
  @exchanges[name]
end

#flow(active = false, &block) ⇒ Object

Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flow­control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.

Parameters:

  • active (Boolean) (defaults to: false)

    Desired flow state.

See Also:

  • AMQP 0.9.1 protocol reference (Section 1.5.2.3.)


196
197
198
199
200
201
# File 'lib/amq/client/async/channel.rb', line 196

def flow(active = false, &block)
  @connection.send_frame(Protocol::Channel::Flow.encode(@id, active))

  self.redefine_callback :flow, &block
  self
end

#flow_is_active?Boolean

Returns True if flow in this channel is active (messages will be delivered to consumers that use this channel).

Returns:

  • (Boolean)

    True if flow in this channel is active (messages will be delivered to consumers that use this channel).



206
207
208
# File 'lib/amq/client/async/channel.rb', line 206

def flow_is_active?
  @flow_is_active
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



266
267
268
# File 'lib/amq/client/async/channel.rb', line 266

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_error(&block) ⇒ Object

Defines a callback that will be executed when channel is closed after channel-level exception.



257
258
259
# File 'lib/amq/client/async/channel.rb', line 257

def on_error(&block)
  self.define_callback(:error, &block)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).



304
305
306
# File 'lib/amq/client/async/channel.rb', line 304

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#open(&block) ⇒ Object Also known as: reopen

Opens AMQP channel.



109
110
111
112
113
114
115
# File 'lib/amq/client/async/channel.rb', line 109

def open(&block)
  @connection.send_frame(Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
  @connection.channels[@id] = self
  self.status = :opening

  self.redefine_callback :open, &block
end

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object

Note:

RabbitMQ as of 2.3.1 does not support prefetch_size.

Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.



179
180
181
182
183
184
# File 'lib/amq/client/async/channel.rb', line 179

def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
  @connection.send_frame(Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))

  self.redefine_callback :qos, &block
  self
end

#queuesArray<Queue>

Returns Collection of queues that were declared on this channel.

Returns:

  • (Array<Queue>)

    Collection of queues that were declared on this channel.



80
81
82
# File 'lib/amq/client/async/channel.rb', line 80

def queues
  @queues.values
end

#recover(requeue = true, &block) ⇒ Channel

Note:

RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.

Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.

Returns:

See Also:

  • AMQP 0.9.1 protocol reference (Section 1.8.3.16.)


161
162
163
164
165
166
# File 'lib/amq/client/async/channel.rb', line 161

def recover(requeue = true, &block)
  @connection.send_frame(Protocol::Basic::Recover.encode(@id, requeue))

  self.redefine_callback :recover, &block
  self
end

#register_exchange(exchange) ⇒ Object

Implementation

Raises:

  • (ArgumentError)


339
340
341
342
343
# File 'lib/amq/client/async/channel.rb', line 339

def register_exchange(exchange)
  raise ArgumentError, "argument is nil!" if exchange.nil?

  @exchanges[exchange.name] = exchange
end

#reject(delivery_tag, requeue = true) ⇒ Object

Reject a message with given delivery tag.

See Also:

  • AMQP 0.9.1 protocol reference (Section 1.8.3.14.)


147
148
149
150
151
# File 'lib/amq/client/async/channel.rb', line 147

def reject(delivery_tag, requeue = true)
  @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))

  self
end

#synchronize(&block) ⇒ Object

Synchronizes given block using this channel’s mutex.



99
100
101
# File 'lib/amq/client/async/channel.rb', line 99

def synchronize(&block)
  @mutex.synchronize(&block)
end

#tx_commit(&block) ⇒ Object

Commits AMQP transaction.



230
231
232
233
234
235
# File 'lib/amq/client/async/channel.rb', line 230

def tx_commit(&block)
  @connection.send_frame(Protocol::Tx::Commit.encode(@id))

  self.redefine_callback :tx_commit, &block
  self
end

#tx_rollback(&block) ⇒ Object

Rolls AMQP transaction back.



240
241
242
243
244
245
# File 'lib/amq/client/async/channel.rb', line 240

def tx_rollback(&block)
  @connection.send_frame(Protocol::Tx::Rollback.encode(@id))

  self.redefine_callback :tx_rollback, &block
  self
end

#tx_select(&block) ⇒ Object

Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.



220
221
222
223
224
225
# File 'lib/amq/client/async/channel.rb', line 220

def tx_select(&block)
  @connection.send_frame(Protocol::Tx::Select.encode(@id))

  self.redefine_callback :tx_select, &block
  self
end