Class: AMQP::Client::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/queue.rb

Overview

Queue abstraction

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name) ⇒ Queue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Should only be initialized from the Client



13
14
15
16
# File 'lib/amqp/client/queue.rb', line 13

def initialize(client, name)
  @client = client
  @name = name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/amqp/client/queue.rb', line 9

def name
  @name
end

Instance Method Details

#bind(exchange, binding_key: "", arguments: {}) ⇒ self

Bind the queue to an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to bind to, or the exchange object itself

  • binding_key (String) (defaults to: "")

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (self)


78
79
80
81
82
# File 'lib/amqp/client/queue.rb', line 78

def bind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.bind(queue: @name, exchange:, binding_key:, arguments:)
  self
end

#deletenil

Delete the queue

Returns:

  • (nil)


104
105
106
107
# File 'lib/amqp/client/queue.rb', line 104

def delete
  @client.delete_queue(@name)
  nil
end

#get(no_ack: false) ⇒ Message?

Get a message from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    When false the message has to be manually acknowledged (or rejected) (default: false)

Returns:

  • (Message, nil)

    The message from the queue or nil if the queue is empty



69
70
71
# File 'lib/amqp/client/queue.rb', line 69

def get(no_ack: false)
  @client.get(@name, no_ack:)
end

#publish(body, **properties) ⇒ Queue

Publish to the queue, wait for confirm

Parameters:

  • body (Object)

    The message body will be encoded if any matching codec is found in the client’s codec registry

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

Raises:



24
25
26
27
# File 'lib/amqp/client/queue.rb', line 24

def publish(body, **properties)
  @client.publish(body, exchange: "", routing_key: @name, **properties)
  self
end

#publish_and_forget(body, **properties) ⇒ Queue

Publish to the queue, without waiting for confirm

Parameters:

  • body (Object)

    The message body will be encoded if any matching codec is found in the client’s codec registry

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

Raises:



34
35
36
37
# File 'lib/amqp/client/queue.rb', line 34

def publish_and_forget(body, **properties)
  @client.publish_and_forget(body, exchange: "", routing_key: @name, **properties)
  self
end

#purgeself

Purge/empty the queue

Returns:

  • (self)


97
98
99
100
# File 'lib/amqp/client/queue.rb', line 97

def purge
  @client.purge(@name)
  self
end

#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer

Subscribe/consume from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    If true, messages are automatically acknowledged by the server upon delivery. If false, messages are acknowledged only after the block completes successfully; if the block raises an exception, the message is rejected and can be optionally requeued. You can of course handle the ack/reject in the block yourself. (Default: false)

  • exclusive (Boolean) (defaults to: false)

    When true only a single consumer can consume from the queue at a time

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will be blocked

  • requeue_on_reject (Boolean) (defaults to: true)

    If true, messages that are rejected due to an exception in the block will be requeued. Only relevant if no_ack is false. (Default: true)

  • on_cancel (Proc) (defaults to: nil)

    Optional proc that will be called if the consumer is cancelled by the broker The proc will be called with the consumer tag as the only argument

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (Consumer)

    The consumer object, which can be used to cancel the consumer



55
56
57
58
59
60
61
62
63
64
# File 'lib/amqp/client/queue.rb', line 55

def subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true,
              on_cancel: nil, arguments: {})
  @client.subscribe(@name, no_ack:, exclusive:, prefetch:, worker_threads:, on_cancel:, arguments:) do |message|
    yield message
    message.ack unless no_ack
  rescue StandardError => e
    message.reject(requeue: requeue_on_reject) unless no_ack
    raise e
  end
end

#unbind(exchange, binding_key: "", arguments: {}) ⇒ self

Unbind the queue from an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to unbind from, or the exchange object itself

  • binding_key (String) (defaults to: "")

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (self)


89
90
91
92
93
# File 'lib/amqp/client/queue.rb', line 89

def unbind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.unbind(queue: @name, exchange:, binding_key:, arguments:)
  self
end