Class: AMQP::Client::Queue
- Inherits:
-
Object
- Object
- AMQP::Client::Queue
- Defined in:
- lib/amqp/client/queue.rb
Overview
Queue abstraction
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#bind(exchange, binding_key: "", arguments: {}) ⇒ self
Bind the queue to an exchange.
-
#delete ⇒ nil
Delete the queue.
-
#get(no_ack: false) ⇒ Message?
Get a message from the queue.
-
#initialize(client, name) ⇒ Queue
constructor
private
Should only be initialized from the Client.
-
#publish(body, **properties) ⇒ Queue
Publish to the queue, wait for confirm.
-
#publish_and_forget(body, **properties) ⇒ Queue
Publish to the queue, without waiting for confirm.
-
#purge ⇒ self
Purge/empty the queue.
-
#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Subscribe/consume from the queue.
-
#unbind(exchange, binding_key: "", arguments: {}) ⇒ self
Unbind the queue from an exchange.
Constructor Details
#initialize(client, name) ⇒ Queue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Should only be initialized from the Client
13 14 15 16 |
# File 'lib/amqp/client/queue.rb', line 13 def initialize(client, name) @client = client @name = name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/amqp/client/queue.rb', line 9 def name @name end |
Instance Method Details
#bind(exchange, binding_key: "", arguments: {}) ⇒ self
Bind the queue to an exchange
78 79 80 81 82 |
# File 'lib/amqp/client/queue.rb', line 78 def bind(exchange, binding_key: "", arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.bind(queue: @name, exchange:, binding_key:, arguments:) self end |
#delete ⇒ nil
Delete the queue
104 105 106 107 |
# File 'lib/amqp/client/queue.rb', line 104 def delete @client.delete_queue(@name) nil end |
#get(no_ack: false) ⇒ Message?
Get a message from the queue
69 70 71 |
# File 'lib/amqp/client/queue.rb', line 69 def get(no_ack: false) @client.get(@name, no_ack:) end |
#publish(body, **properties) ⇒ Queue
Publish to the queue, wait for confirm
24 25 26 27 |
# File 'lib/amqp/client/queue.rb', line 24 def publish(body, **properties) @client.publish(body, exchange: "", routing_key: @name, **properties) self end |
#publish_and_forget(body, **properties) ⇒ Queue
Publish to the queue, without waiting for confirm
34 35 36 37 |
# File 'lib/amqp/client/queue.rb', line 34 def publish_and_forget(body, **properties) @client.publish_and_forget(body, exchange: "", routing_key: @name, **properties) self end |
#purge ⇒ self
Purge/empty the queue
97 98 99 100 |
# File 'lib/amqp/client/queue.rb', line 97 def purge @client.purge(@name) self end |
#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Subscribe/consume from the queue
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/amqp/client/queue.rb', line 55 def subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) @client.subscribe(@name, no_ack:, exclusive:, prefetch:, worker_threads:, on_cancel:, arguments:) do || yield .ack unless no_ack rescue StandardError => e .reject(requeue: requeue_on_reject) unless no_ack raise e end end |
#unbind(exchange, binding_key: "", arguments: {}) ⇒ self
Unbind the queue from an exchange
89 90 91 92 93 |
# File 'lib/amqp/client/queue.rb', line 89 def unbind(exchange, binding_key: "", arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.unbind(queue: @name, exchange:, binding_key:, arguments:) self end |