Class: AMQP::Client::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/queue.rb

Overview

Queue abstraction

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name) ⇒ Queue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Should only be initialized from the Client



13
14
15
16
# File 'lib/amqp/client/queue.rb', line 13

def initialize(client, name)
  @client = client
  @name = name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/amqp/client/queue.rb', line 9

def name
  @name
end

Instance Method Details

#bind(exchange, binding_key: "", arguments: {}) ⇒ self

Bind the queue to an exchange



78
79
80
81
82
# File 'lib/amqp/client/queue.rb', line 78

def bind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.bind(queue: @name, exchange:, binding_key:, arguments:)
  self
end

#deletenil

Delete the queue



104
105
106
107
# File 'lib/amqp/client/queue.rb', line 104

def delete
  @client.delete_queue(@name)
  nil
end

#get(no_ack: false) ⇒ Message?

Get a message from the queue



69
70
71
# File 'lib/amqp/client/queue.rb', line 69

def get(no_ack: false)
  @client.get(@name, no_ack:)
end

#publish(body, **properties) ⇒ Queue

Publish to the queue, wait for confirm

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Raises:



24
25
26
27
# File 'lib/amqp/client/queue.rb', line 24

def publish(body, **properties)
  @client.publish(body, exchange: "", routing_key: @name, **properties)
  self
end

#publish_and_forget(body, **properties) ⇒ Queue

Publish to the queue, without waiting for confirm

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Raises:



34
35
36
37
# File 'lib/amqp/client/queue.rb', line 34

def publish_and_forget(body, **properties)
  @client.publish_and_forget(body, exchange: "", routing_key: @name, **properties)
  self
end

#purgeself

Purge/empty the queue



97
98
99
100
# File 'lib/amqp/client/queue.rb', line 97

def purge
  @client.purge(@name)
  self
end

#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer

Subscribe/consume from the queue

Yields:

  • (Message)

    Delivered message from the queue



55
56
57
58
59
60
61
62
63
64
# File 'lib/amqp/client/queue.rb', line 55

def subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true,
              on_cancel: nil, arguments: {})
  @client.subscribe(@name, no_ack:, exclusive:, prefetch:, worker_threads:, on_cancel:, arguments:) do |message|
    yield message
    message.ack unless no_ack
  rescue StandardError => e
    message.reject(requeue: requeue_on_reject) unless no_ack
    raise e
  end
end

#unbind(exchange, binding_key: "", arguments: {}) ⇒ self

Unbind the queue from an exchange



89
90
91
92
93
# File 'lib/amqp/client/queue.rb', line 89

def unbind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.unbind(queue: @name, exchange:, binding_key:, arguments:)
  self
end