Class: AMQP::Client::Connection::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/channel.rb

Overview

AMQP Channel

Queue collapse

QueueOk =

Response when declaring a Queue

Data.define(:queue_name, :message_count, :consumer_count)

Basic collapse

ConsumeOk =

Response when subscribing (starting a consumer)

Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

Queue collapse

Basic collapse

Instance Attribute Summary collapse

Exchange collapse

Queue collapse

Basic collapse

Confirm collapse

Transaction collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, id) ⇒ Channel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Should only be called from Connection



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/amqp/client/channel.rb', line 16

def initialize(connection, id)
  @connection = connection
  @id = id
  @replies = ::Queue.new
  @consumers = {}
  @closed = nil
  @open = false
  @on_return = nil
  @confirm = nil
  @unconfirmed = []
  @unconfirmed_lock = Mutex.new
  @unconfirmed_empty = ConditionVariable.new
  @nacked = false
  @basic_gets = ::Queue.new
end

Instance Attribute Details

#channel_idInteger



322
# File 'lib/amqp/client/channel.rb', line 322

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

#connectionConnection (readonly)

Connection this channel belongs to



45
46
47
# File 'lib/amqp/client/channel.rb', line 45

def connection
  @connection
end

#consumer_countInteger



163
# File 'lib/amqp/client/channel.rb', line 163

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#consumer_tagString



322
# File 'lib/amqp/client/channel.rb', line 322

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

#idInteger (readonly)

Channel ID



41
42
43
# File 'lib/amqp/client/channel.rb', line 41

def id
  @id
end

#message_countInteger



163
# File 'lib/amqp/client/channel.rb', line 163

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#queue_nameString



163
# File 'lib/amqp/client/channel.rb', line 163

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#worker_threadsArray<Thread>



322
# File 'lib/amqp/client/channel.rb', line 322

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

Instance Method Details

#basic_ack(delivery_tag, multiple: false) ⇒ nil

Acknowledge a message



411
412
413
414
# File 'lib/amqp/client/channel.rb', line 411

def basic_ack(delivery_tag, multiple: false)
  write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
  nil
end

#basic_cancel(consumer_tag, no_wait: false) ⇒ nil

Cancel/abort/stop a consumer



385
386
387
388
389
390
391
392
393
394
# File 'lib/amqp/client/channel.rb', line 385

def basic_cancel(consumer_tag, no_wait: false)
  consumer = @consumers[consumer_tag]
  return unless consumer

  write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
  expect(:basic_cancel_ok) unless no_wait
  @consumers.delete(consumer_tag)
  close_consumer(consumer)
  nil
end

#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?

Consume messages from a queue

Yields:

  • (Message)

    Delivered message from the queue

Raises:

  • (ArgumentError)


338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/amqp/client/channel.rb', line 338

def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false,
                  arguments: {}, worker_threads: 1, on_cancel: nil, &blk)
  raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty?

  write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments)
  consumer_tag, = expect(:basic_consume_ok) unless no_wait
  msg_q = ::Queue.new
  if worker_threads.zero?
    @consumers[consumer_tag] =
      ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: [], msg_q:, on_cancel:)
    consume_loop(msg_q, consumer_tag, &blk)
    nil
  else
    threads = Array.new(worker_threads) do
      Thread.new { consume_loop(msg_q, consumer_tag, &blk) }
    end
    @consumers[consumer_tag] =
      ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: threads, msg_q:, on_cancel:)
  end
end

#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message

Consume a single message from a queue

Yields:

  • Block in which the message will be yielded

Raises:

  • (Timeout::Error)

    if no response is received within the timeout period



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/amqp/client/channel.rb', line 365

def basic_consume_once(queue, timeout: nil, &)
  tag = "consume-once-#{rand(1024)}"
  write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil)
  msg_q = ::Queue.new
  @consumers[tag] =
    ConsumeOk.new(channel_id: @id, consumer_tag: tag, worker_threads: [], msg_q:, on_cancel: nil)
  yield if block_given?
  msg = msg_q.pop(timeout:)
  write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true)
  consumer = @consumers.delete(tag)
  close_consumer(consumer)
  raise Timeout::Error, "No message received in #{timeout} seconds" if timeout && msg.nil?

  msg
end

#basic_get(queue_name, no_ack: true) ⇒ Message?

Get a message from a queue (by polling)



242
243
244
245
246
247
248
249
# File 'lib/amqp/client/channel.rb', line 242

def basic_get(queue_name, no_ack: true)
  write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
  case (msg = @basic_gets.pop)
  when Message then msg
  when :basic_get_empty then nil
  when nil              then raise Error::Closed.new(@id, *@closed)
  end
end

#basic_get_emptyObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



541
542
543
# File 'lib/amqp/client/channel.rb', line 541

def basic_get_empty
  @basic_gets.push :basic_get_empty
end

#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil

Negatively acknowledge a message



421
422
423
424
# File 'lib/amqp/client/channel.rb', line 421

def basic_nack(delivery_tag, multiple: false, requeue: false)
  write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
  nil
end

#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil

Publishes a message to an exchange

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/amqp/client/channel.rb', line 272

def basic_publish(body, exchange:, routing_key: "", **properties)
  body_max = @connection.frame_max - 8
  id = @id
  mandatory = properties.delete(:mandatory) || false
  case properties.delete(:persistent)
  when true then properties[:delivery_mode] = 2
  when false then properties[:delivery_mode] = 1
  end
  if @confirm
    @unconfirmed_lock.synchronize do
      @unconfirmed.push @confirm += 1
    end
  end
  if body.bytesize.between?(1, body_max)
    write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
                FrameBytes.header(id, body.bytesize, properties),
                FrameBytes.body(id, body)
    return
  end

  write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
              FrameBytes.header(id, body.bytesize, properties)
  pos = 0
  while pos < body.bytesize # split body into multiple frame_max frames
    len = [body_max, body.bytesize - pos].min
    body_part = body.byteslice(pos, len)
    write_bytes FrameBytes.body(id, body_part)
    pos += len
  end
  nil
end

#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean

Publish a message and block until the message has confirmed it has received it

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message



309
310
311
312
313
# File 'lib/amqp/client/channel.rb', line 309

def basic_publish_confirm(body, exchange:, routing_key: "", **properties)
  confirm_select(no_wait: true)
  basic_publish(body, exchange:, routing_key:, **properties)
  wait_for_confirms
end

#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil

Specify how many messages to prefetch for consumers with ‘no_ack: false`



401
402
403
404
405
# File 'lib/amqp/client/channel.rb', line 401

def basic_qos(prefetch_count, prefetch_size: 0, global: false)
  write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
  expect :basic_qos_ok
  nil
end

#basic_recover(requeue: false) ⇒ nil

Recover all the unacknowledge messages



439
440
441
442
443
# File 'lib/amqp/client/channel.rb', line 439

def basic_recover(requeue: false)
  write_bytes FrameBytes.basic_recover(@id, requeue:)
  expect :basic_recover_ok
  nil
end

#basic_reject(delivery_tag, requeue: false) ⇒ nil

Reject a message



430
431
432
433
# File 'lib/amqp/client/channel.rb', line 430

def basic_reject(delivery_tag, requeue: false)
  write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
  nil
end

#body_delivered(body_part) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



557
558
559
560
561
562
563
# File 'lib/amqp/client/channel.rb', line 557

def body_delivered(body_part)
  @next_body.write(body_part)
  return unless @next_body.pos == @next_body_size

  @next_msg.body = @next_body.string
  next_message_finished!
end

#cancel_consumer(tag) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle consumer cancellation from the broker



567
568
569
570
571
572
573
574
575
576
577
578
# File 'lib/amqp/client/channel.rb', line 567

def cancel_consumer(tag)
  consumer = @consumers.delete(tag)
  return unless consumer

  close_consumer(consumer)
  begin
    consumer.on_cancel&.call(consumer.consumer_tag)
  rescue StandardError => e
    warn "AMQP-Client consumer on_cancel callback error: #{e.class}: #{e.message}"
  end
  nil
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close channel



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/amqp/client/channel.rb', line 63

def close(reason: "", code: 200)
  return if @closed

  write_bytes FrameBytes.channel_close(@id, reason, code)
  @closed = [:channel, code, reason]
  expect :channel_close_ok
  @replies.close
  @basic_gets.close
  @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
  @consumers.each_value { |c| close_consumer(c) }
  nil
end

#closed!(level, code, reason, classid, methodid) ⇒ nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called when channel is closed by broker



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/amqp/client/channel.rb', line 80

def closed!(level, code, reason, classid, methodid)
  @closed = [level, code, reason, classid, methodid]
  @replies.close
  @basic_gets.close
  @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
  @consumers.each_value do |c|
    close_consumer(c)
    c.msg_q.clear # empty the queues too, messages can't be acked anymore
  end
  nil
end

#confirm(args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by Connection when received ack/nack from broker



481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/amqp/client/channel.rb', line 481

def confirm(args)
  ack_or_nack, delivery_tag, multiple = *args
  @unconfirmed_lock.synchronize do
    case multiple
    when true
      idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found")
      @unconfirmed.shift(idx + 1)
    when false
      @unconfirmed.delete(delivery_tag) || raise("Delivery tag not found")
    end
    @nacked = true if ack_or_nack == :nack
    @unconfirmed_empty.broadcast if @unconfirmed.empty?
  end
end

#confirm_select(no_wait: false) ⇒ nil

Put the channel in confirm mode, each published message will then be confirmed by the broker



451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/amqp/client/channel.rb', line 451

def confirm_select(no_wait: false)
  return if @confirm # fast path

  @unconfirmed_lock.synchronize do
    # check again in case another thread already did this while we waited for the lock
    return if @confirm

    write_bytes FrameBytes.confirm_select(@id, no_wait)
    expect :confirm_select_ok unless no_wait
    @confirm = 0
  end
  nil
end

#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil

Bind an exchange to another exchange



135
136
137
138
139
# File 'lib/amqp/client/channel.rb', line 135

def exchange_bind(source:, destination:, binding_key:, arguments: {})
  write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_bind_ok
  nil
end

#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil

Declare an exchange



112
113
114
115
116
# File 'lib/amqp/client/channel.rb', line 112

def exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {})
  write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments)
  expect :exchange_declare_ok
  nil
end

#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil

Delete an exchange



123
124
125
126
127
# File 'lib/amqp/client/channel.rb', line 123

def exchange_delete(name, if_unused: false, no_wait: false)
  write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
  expect :exchange_delete_ok unless no_wait
  nil
end

#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil

Unbind an exchange from another exchange



147
148
149
150
151
# File 'lib/amqp/client/channel.rb', line 147

def exchange_unbind(source:, destination:, binding_key:, arguments: {})
  write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_unbind_ok
  nil
end

#header_delivered(body_size, properties) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



546
547
548
549
550
551
552
553
554
# File 'lib/amqp/client/channel.rb', line 546

def header_delivered(body_size, properties)
  @next_msg.properties = properties
  if body_size.zero?
    next_message_finished!
  else
    @next_body = StringIO.new(String.new(capacity: body_size))
    @next_body_size = body_size
  end
end

#inspectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Override #inspect



34
35
36
37
# File 'lib/amqp/client/channel.rb', line 34

def inspect
  "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{[email protected]?} " \
    "consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>"
end

#message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



536
537
538
# File 'lib/amqp/client/channel.rb', line 536

def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key)
  @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered)
end

#message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



531
532
533
# File 'lib/amqp/client/channel.rb', line 531

def message_returned(reply_code, reply_text, exchange, routing_key)
  @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key)
end

#on_return {|ReturnMessage| ... } ⇒ Object

Handle returned messages in this block. If not set the message will just be logged to STDERR

Yields:

  • (ReturnMessage)

    Messages returned by the broker when a publish has failed



95
96
97
98
# File 'lib/amqp/client/channel.rb', line 95

def on_return(&block)
  @on_return = block
  nil
end

#openChannel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Open the channel (called from Connection)



50
51
52
53
54
55
56
57
# File 'lib/amqp/client/channel.rb', line 50

def open
  return self if @open

  @open = true
  write_bytes FrameBytes.channel_open(@id)
  expect(:channel_open_ok)
  self
end

#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil

Bind a queue to an exchange



205
206
207
208
209
# File 'lib/amqp/client/channel.rb', line 205

def queue_bind(name, exchange:, binding_key: "", arguments: {})
  write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
  expect :queue_bind_ok
  nil
end

#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk

Create a queue (operation is idempotent)



175
176
177
178
179
180
181
182
183
184
# File 'lib/amqp/client/channel.rb', line 175

def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
  durable = false if name.empty?
  exclusive = true if name.empty?
  auto_delete = true if name.empty?

  write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
  name, message_count, consumer_count = expect(:queue_declare_ok)

  QueueOk.new(name, message_count, consumer_count)
end

#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?

Delete a queue



193
194
195
196
197
# File 'lib/amqp/client/channel.rb', line 193

def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
  write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
  message_count, = expect :queue_delete unless no_wait
  message_count
end

#queue_purge(name, no_wait: false) ⇒ Integer?

Purge a queue



216
217
218
219
220
# File 'lib/amqp/client/channel.rb', line 216

def queue_purge(name, no_wait: false)
  write_bytes FrameBytes.queue_purge(@id, name, no_wait)
  message_count, = expect :queue_purge_ok unless no_wait
  message_count
end

#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil

Unbind a queue from an exchange



228
229
230
231
232
# File 'lib/amqp/client/channel.rb', line 228

def queue_unbind(name, exchange:, binding_key: "", arguments: {})
  write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
  expect :queue_unbind_ok
  nil
end

#reply(args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



526
527
528
# File 'lib/amqp/client/channel.rb', line 526

def reply(args)
  @replies.push(args)
end

#tx_commitnil

Commmit a transaction, requires that the channel is in transaction mode



509
510
511
512
513
# File 'lib/amqp/client/channel.rb', line 509

def tx_commit
  write_bytes FrameBytes.tx_commit(@id)
  expect :tx_commit_ok
  nil
end

#tx_rollbacknil

Rollback a transaction, requires that the channel is in transaction mode



517
518
519
520
521
# File 'lib/amqp/client/channel.rb', line 517

def tx_rollback
  write_bytes FrameBytes.tx_rollback(@id)
  expect :tx_rollback_ok
  nil
end

#tx_selectnil

Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish



501
502
503
504
505
# File 'lib/amqp/client/channel.rb', line 501

def tx_select
  write_bytes FrameBytes.tx_select(@id)
  expect :tx_select_ok
  nil
end

#wait_for_confirmsBoolean

Block until all publishes messages are confirmed



467
468
469
470
471
472
473
474
475
476
477
# File 'lib/amqp/client/channel.rb', line 467

def wait_for_confirms
  @unconfirmed_lock.synchronize do
    until @unconfirmed.empty?
      @unconfirmed_empty.wait(@unconfirmed_lock)
      raise Error::Closed.new(@id, *@closed) if @closed
    end
    result = !@nacked
    @nacked = false # Reset for next round of publishes
    result
  end
end