Class: AMQP::Client::Connection::Channel
- Inherits:
-
Object
- Object
- AMQP::Client::Connection::Channel
- Defined in:
- lib/amqp/client/channel.rb
Overview
AMQP Channel
Queue collapse
- QueueOk =
Response when declaring a Queue
Data.define(:queue_name, :message_count, :consumer_count)
Basic collapse
- ConsumeOk =
Response when subscribing (starting a consumer)
Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)
Queue collapse
-
#consumer_count ⇒ Integer
Number of consumers subscribed to the queue at the time of declaration.
-
#message_count ⇒ Integer
Number of messages in the queue at the time of declaration.
-
#queue_name ⇒ String
The name of the queue.
Basic collapse
-
#channel_id ⇒ Integer
The channel ID.
-
#consumer_tag ⇒ String
The consumer tag.
-
#worker_threads ⇒ Array<Thread>
Array of worker threads.
Instance Attribute Summary collapse
-
#connection ⇒ Connection
readonly
Connection this channel belongs to.
-
#id ⇒ Integer
readonly
Channel ID.
Exchange collapse
-
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange.
-
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange.
-
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange.
-
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange.
Queue collapse
-
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent).
-
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue.
-
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue.
-
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Basic collapse
-
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message.
-
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer.
-
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue.
-
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue.
-
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling).
-
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message.
-
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange.
-
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it.
-
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`.
-
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages.
-
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message.
Confirm collapse
-
#confirm(args) ⇒ Object
private
Called by Connection when received ack/nack from broker.
-
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker.
-
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed.
Transaction collapse
-
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode.
-
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode.
-
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish.
Instance Method Summary collapse
- #basic_get_empty ⇒ Object private
- #body_delivered(body_part) ⇒ Object private
-
#cancel_consumer(tag) ⇒ Object
private
Handle consumer cancellation from the broker.
-
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel.
-
#closed!(level, code, reason, classid, methodid) ⇒ nil
private
Called when channel is closed by broker.
- #header_delivered(body_size, properties) ⇒ Object private
-
#initialize(connection, id) ⇒ Channel
constructor
private
Should only be called from Connection.
-
#inspect ⇒ Object
private
Override #inspect.
- #message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object private
- #message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object private
-
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block.
-
#open ⇒ Channel
private
Open the channel (called from Connection).
- #reply(args) ⇒ Object private
Constructor Details
#initialize(connection, id) ⇒ Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Should only be called from Connection
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/amqp/client/channel.rb', line 16 def initialize(connection, id) @connection = connection @id = id @replies = ::Queue.new @consumers = {} @closed = nil @open = false @on_return = nil @confirm = nil @unconfirmed = [] @unconfirmed_lock = Mutex.new @unconfirmed_empty = ConditionVariable.new @nacked = false @basic_gets = ::Queue.new end |
Instance Attribute Details
#channel_id ⇒ Integer
Returns The channel ID.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#connection ⇒ Connection (readonly)
Connection this channel belongs to
45 46 47 |
# File 'lib/amqp/client/channel.rb', line 45 def connection @connection end |
#consumer_count ⇒ Integer
Returns Number of consumers subscribed to the queue at the time of declaration.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#consumer_tag ⇒ String
Returns The consumer tag.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#id ⇒ Integer (readonly)
Channel ID
41 42 43 |
# File 'lib/amqp/client/channel.rb', line 41 def id @id end |
#message_count ⇒ Integer
Returns Number of messages in the queue at the time of declaration.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#queue_name ⇒ String
Returns The name of the queue.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#worker_threads ⇒ Array<Thread>
Returns Array of worker threads.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
Instance Method Details
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message
411 412 413 414 |
# File 'lib/amqp/client/channel.rb', line 411 def basic_ack(delivery_tag, multiple: false) write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) nil end |
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer
385 386 387 388 389 390 391 392 393 394 |
# File 'lib/amqp/client/channel.rb', line 385 def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers[consumer_tag] return unless consumer write_bytes FrameBytes.basic_cancel(@id, consumer_tag) expect(:basic_cancel_ok) unless no_wait @consumers.delete(consumer_tag) close_consumer(consumer) nil end |
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/amqp/client/channel.rb', line 338 def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil, &blk) raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty? write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments) consumer_tag, = expect(:basic_consume_ok) unless no_wait msg_q = ::Queue.new if worker_threads.zero? @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: [], msg_q:, on_cancel:) consume_loop(msg_q, consumer_tag, &blk) nil else threads = Array.new(worker_threads) do Thread.new { consume_loop(msg_q, consumer_tag, &blk) } end @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: threads, msg_q:, on_cancel:) end end |
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/amqp/client/channel.rb', line 365 def basic_consume_once(queue, timeout: nil, &) tag = "consume-once-#{rand(1024)}" write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil) msg_q = ::Queue.new @consumers[tag] = ConsumeOk.new(channel_id: @id, consumer_tag: tag, worker_threads: [], msg_q:, on_cancel: nil) yield if block_given? msg = msg_q.pop(timeout:) write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true) consumer = @consumers.delete(tag) close_consumer(consumer) raise Timeout::Error, "No message received in #{timeout} seconds" if timeout && msg.nil? msg end |
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling)
242 243 244 245 246 247 248 249 |
# File 'lib/amqp/client/channel.rb', line 242 def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil when nil then raise Error::Closed.new(@id, *@closed) end end |
#basic_get_empty ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
541 542 543 |
# File 'lib/amqp/client/channel.rb', line 541 def basic_get_empty @basic_gets.push :basic_get_empty end |
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message
421 422 423 424 |
# File 'lib/amqp/client/channel.rb', line 421 def basic_nack(delivery_tag, multiple: false, requeue: false) write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) nil end |
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/amqp/client/channel.rb', line 272 def basic_publish(body, exchange:, routing_key: "", **properties) body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end if @confirm @unconfirmed_lock.synchronize do @unconfirmed.push @confirm += 1 end end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end nil end |
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it
309 310 311 312 313 |
# File 'lib/amqp/client/channel.rb', line 309 def basic_publish_confirm(body, exchange:, routing_key: "", **properties) confirm_select(no_wait: true) basic_publish(body, exchange:, routing_key:, **properties) wait_for_confirms end |
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`
401 402 403 404 405 |
# File 'lib/amqp/client/channel.rb', line 401 def basic_qos(prefetch_count, prefetch_size: 0, global: false) write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) expect :basic_qos_ok nil end |
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages
439 440 441 442 443 |
# File 'lib/amqp/client/channel.rb', line 439 def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue:) expect :basic_recover_ok nil end |
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message
430 431 432 433 |
# File 'lib/amqp/client/channel.rb', line 430 def basic_reject(delivery_tag, requeue: false) write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) nil end |
#body_delivered(body_part) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
557 558 559 560 561 562 563 |
# File 'lib/amqp/client/channel.rb', line 557 def body_delivered(body_part) @next_body.write(body_part) return unless @next_body.pos == @next_body_size @next_msg.body = @next_body.string end |
#cancel_consumer(tag) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle consumer cancellation from the broker
567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/amqp/client/channel.rb', line 567 def cancel_consumer(tag) consumer = @consumers.delete(tag) return unless consumer close_consumer(consumer) begin consumer.on_cancel&.call(consumer.consumer_tag) rescue StandardError => e warn "AMQP-Client consumer on_cancel callback error: #{e.class}: #{e.}" end nil end |
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/amqp/client/channel.rb', line 63 def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value { |c| close_consumer(c) } nil end |
#closed!(level, code, reason, classid, methodid) ⇒ nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called when channel is closed by broker
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/amqp/client/channel.rb', line 80 def closed!(level, code, reason, classid, methodid) @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value do |c| close_consumer(c) c.msg_q.clear # empty the queues too, messages can't be acked anymore end nil end |
#confirm(args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by Connection when received ack/nack from broker
481 482 483 484 485 486 487 488 489 490 491 492 493 494 |
# File 'lib/amqp/client/channel.rb', line 481 def confirm(args) ack_or_nack, delivery_tag, multiple = *args @unconfirmed_lock.synchronize do case multiple when true idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found") @unconfirmed.shift(idx + 1) when false @unconfirmed.delete(delivery_tag) || raise("Delivery tag not found") end @nacked = true if ack_or_nack == :nack @unconfirmed_empty.broadcast if @unconfirmed.empty? end end |
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker
451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/amqp/client/channel.rb', line 451 def confirm_select(no_wait: false) return if @confirm # fast path @unconfirmed_lock.synchronize do # check again in case another thread already did this while we waited for the lock return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait @confirm = 0 end nil end |
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange
135 136 137 138 139 |
# File 'lib/amqp/client/channel.rb', line 135 def exchange_bind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok nil end |
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange
112 113 114 115 116 |
# File 'lib/amqp/client/channel.rb', line 112 def exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok nil end |
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange
123 124 125 126 127 |
# File 'lib/amqp/client/channel.rb', line 123 def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil end |
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange
147 148 149 150 151 |
# File 'lib/amqp/client/channel.rb', line 147 def exchange_unbind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok nil end |
#header_delivered(body_size, properties) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
546 547 548 549 550 551 552 553 554 |
# File 'lib/amqp/client/channel.rb', line 546 def header_delivered(body_size, properties) @next_msg.properties = properties if body_size.zero? else @next_body = StringIO.new(String.new(capacity: body_size)) @next_body_size = body_size end end |
#inspect ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Override #inspect
34 35 36 37 |
# File 'lib/amqp/client/channel.rb', line 34 def inspect "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?} " \ "consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>" end |
#message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
536 537 538 |
# File 'lib/amqp/client/channel.rb', line 536 def (consumer_tag, delivery_tag, redelivered, exchange, routing_key) @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered) end |
#message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
531 532 533 |
# File 'lib/amqp/client/channel.rb', line 531 def (reply_code, reply_text, exchange, routing_key) @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key) end |
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block. If not set the message will just be logged to STDERR
95 96 97 98 |
# File 'lib/amqp/client/channel.rb', line 95 def on_return(&block) @on_return = block nil end |
#open ⇒ Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Open the channel (called from Connection)
50 51 52 53 54 55 56 57 |
# File 'lib/amqp/client/channel.rb', line 50 def open return self if @open @open = true write_bytes FrameBytes.channel_open(@id) expect(:channel_open_ok) self end |
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange
205 206 207 208 209 |
# File 'lib/amqp/client/channel.rb', line 205 def queue_bind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok nil end |
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent)
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/amqp/client/channel.rb', line 175 def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, , consumer_count = expect(:queue_declare_ok) QueueOk.new(name, , consumer_count) end |
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue
193 194 195 196 197 |
# File 'lib/amqp/client/channel.rb', line 193 def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) , = expect :queue_delete unless no_wait end |
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue
216 217 218 219 220 |
# File 'lib/amqp/client/channel.rb', line 216 def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) , = expect :queue_purge_ok unless no_wait end |
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange
228 229 230 231 232 |
# File 'lib/amqp/client/channel.rb', line 228 def queue_unbind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok nil end |
#reply(args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
526 527 528 |
# File 'lib/amqp/client/channel.rb', line 526 def reply(args) @replies.push(args) end |
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode
509 510 511 512 513 |
# File 'lib/amqp/client/channel.rb', line 509 def tx_commit write_bytes FrameBytes.tx_commit(@id) expect :tx_commit_ok nil end |
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode
517 518 519 520 521 |
# File 'lib/amqp/client/channel.rb', line 517 def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok nil end |
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish
501 502 503 504 505 |
# File 'lib/amqp/client/channel.rb', line 501 def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok nil end |
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed
467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/amqp/client/channel.rb', line 467 def wait_for_confirms @unconfirmed_lock.synchronize do until @unconfirmed.empty? @unconfirmed_empty.wait(@unconfirmed_lock) raise Error::Closed.new(@id, *@closed) if @closed end result = !@nacked @nacked = false # Reset for next round of publishes result end end |