Class: Bunny::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/queue.rb

Overview

Represents AMQP 0.9.1 queue.

Defined Under Namespace

Modules: Types, XArgs

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • channel (Bunny::Channel)

    Channel this queue will use.

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to make RabbitMQ generate a unique one.

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto_delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :type (String) — default: nil

    Type of the declared queue (classic, quorum or stream)

  • :arguments (Hash) — default: nil

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/bunny/queue.rb', line 54

def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {})
  # old Bunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel
  @name             = name
  @options          = self.class.add_default_options(name, opts)

  @durable          = @options[:durable]
  @exclusive        = @options[:exclusive]
  @server_named     = @name.empty?
  @auto_delete      = @options[:auto_delete]
  @type             = @options[:type]

  @arguments        = if @type and !@type.empty? then
    (@options[:arguments] || {}).merge({XArgs::QUEUE_TYPE => @type})
  else
    @options[:arguments]
  end
  verify_type!(@arguments)
  # reassigns updated and verified arguments because Bunny::Channel#declare_queue
  # accepts a map of options
  @options[:arguments] = @arguments

  @bindings         = Array.new

  @default_consumer = nil

  declare! unless opts[:no_declare]

  @channel.register_queue(self)
end

Instance Attribute Details

#channelBunny::Channel (readonly)

Returns Channel this queue uses.

Returns:



34
35
36
# File 'lib/bunny/queue.rb', line 34

def channel
  @channel
end

#nameString (readonly)

Returns Queue name.

Returns:

  • (String)

    Queue name



36
37
38
# File 'lib/bunny/queue.rb', line 36

def name
  @name
end

#optionsHash (readonly)

Returns Options this queue was created with.

Returns:

  • (Hash)

    Options this queue was created with



38
39
40
# File 'lib/bunny/queue.rb', line 38

def options
  @options
end

Class Method Details

.verify_type!(args0 = {}) ⇒ Object



357
358
359
360
361
362
363
# File 'lib/bunny/queue.rb', line 357

def self.verify_type!(args0 = {})
  # be extra defensive
  args = args0 || {}
  q_type = args["x-queue-type"] || args[:"x-queue-type"]
  throw ArgumentError.new(
    "unsupported queue type #{q_type.inspect}, supported ones: #{Types::KNOWN.join(', ')}") if (q_type and !Types.known?(q_type))
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



116
117
118
# File 'lib/bunny/queue.rb', line 116

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

See Also:



103
104
105
# File 'lib/bunny/queue.rb', line 103

def auto_delete?
  @auto_delete
end

#bind(exchange, opts = {}) ⇒ Object

Binds queue to an exchange

Parameters:

  • exchange (Bunny::Exchange, String)

    Exchange to bind to

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/bunny/queue.rb', line 140

def bind(exchange, opts = {})
  @channel.queue_bind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

#consumer_countInteger

Returns How many active consumers the queue has.

Returns:

  • (Integer)

    How many active consumers the queue has



352
353
354
355
# File 'lib/bunny/queue.rb', line 352

def consumer_count
  s = self.status
  s[:consumer_count]
end

#declare!Object



406
407
408
409
# File 'lib/bunny/queue.rb', line 406

def declare!
  queue_declare_ok = @channel.queue_declare(@name, @options)
  @name = queue_declare_ok.queue
end

#delete(opts = {}) ⇒ Object

Deletes the queue

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



322
323
324
325
# File 'lib/bunny/queue.rb', line 322

def delete(opts = {})
  @channel.deregister_queue(self)
  @channel.queue_delete(@name, opts)
end

#durable?Boolean

Returns true if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).

See Also:



89
90
91
# File 'lib/bunny/queue.rb', line 89

def durable?
  @durable
end

#exclusive?Boolean

Returns true if this queue was declared as exclusive (limited to just one consumer).

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)

See Also:



96
97
98
# File 'lib/bunny/queue.rb', line 96

def exclusive?
  @exclusive
end

#inspectObject



125
126
127
# File 'lib/bunny/queue.rb', line 125

def inspect
  to_s
end

#message_countInteger

Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).

Returns:

  • (Integer)

    How many messages the queue has ready (e.g. not delivered but not unacknowledged)



346
347
348
349
# File 'lib/bunny/queue.rb', line 346

def message_count
  s = self.status
  s[:message_count]
end

#pop(opts = {:manual_ack => false}, &block) ⇒ Array Also known as: get

Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

Examples:

conn = Bunny.new
conn.start

ch   = conn.create_channel
q = ch.queue("test1")
x = ch.default_exchange
x.publish("Hello, everybody!", :routing_key => 'test1')

delivery_info, properties, payload = q.pop

puts "This is the message: " + payload + "\n\n"
conn.close

Parameters:

  • opts (Hash) (defaults to: {:manual_ack => false})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: false
    DEPRECATED

    Use :manual_ack instead

  • :manual_ack (Boolean) — default: false

    Will the message be acknowledged manually?

Returns:

  • (Array)

    Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

See Also:



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/bunny/queue.rb', line 271

def pop(opts = {:manual_ack => false}, &block)
  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  get_response, properties, content = @channel.basic_get(@name, opts)

  if block
    if properties
      di = GetResponse.new(get_response, @channel)
      mp = MessageProperties.new(properties)

      block.call(di, mp, content)
    else
      block.call(nil, nil, nil)
    end
  else
    if properties
      di = GetResponse.new(get_response, @channel)
      mp = MessageProperties.new(properties)
      [di, mp, content]
    else
      [nil, nil, nil]
    end
  end
end

#publish(payload, opts = {}) ⇒ Object

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



306
307
308
309
310
# File 'lib/bunny/queue.rb', line 306

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

#purge(opts = {}) ⇒ Object

Purges a queue (removes all messages from it)



330
331
332
333
334
# File 'lib/bunny/queue.rb', line 330

def purge(opts = {})
  @channel.queue_purge(@name, opts)

  self
end

#recover_bindingsObject



392
393
394
395
396
397
398
# File 'lib/bunny/queue.rb', line 392

def recover_bindings
  @bindings.each do |b|
    # TODO: inject and use logger
    # puts "Recovering binding #{b.inspect}"
    self.bind(b[:exchange], b)
  end
end

#recover_from_network_failureObject



370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/bunny/queue.rb', line 370

def recover_from_network_failure
  if self.server_named?
    old_name = @name.dup
    @name    = AMQ::Protocol::EMPTY_STRING

    @channel.deregister_queue_named(old_name)
  end

  # TODO: inject and use logger
  # puts "Recovering queue #{@name}"
  begin
    declare! unless @options[:no_declare]

    @channel.register_queue(self)
  rescue Exception => e
    # TODO: inject and use logger
    puts "Caught #{e.inspect} while redeclaring and registering #{@name}!"
  end
  recover_bindings
end

#server_named?Boolean

Returns true if this queue was declared as server named.

Returns:

  • (Boolean)

    true if this queue was declared as server named.

See Also:



110
111
112
# File 'lib/bunny/queue.rb', line 110

def server_named?
  @server_named
end

#statusHash

Returns A hash with information about the number of queue messages and consumers.

Returns:

  • (Hash)

    A hash with information about the number of queue messages and consumers

See Also:



339
340
341
342
343
# File 'lib/bunny/queue.rb', line 339

def status
  queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true))
  {:message_count => queue_declare_ok.message_count,
    :consumer_count => queue_declare_ok.consumer_count}
end

#subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :manual_ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ⇒ Object

Adds a consumer to the queue (subscribes for message deliveries).

Parameters:

  • opts (Hash) (defaults to: { :consumer_tag => @channel.generate_consumer_tag, :manual_ack => false, :exclusive => false, :block => false, :on_cancellation => nil })

    Options

Options Hash (opts):

  • :ack (Boolean) — default: false
    DEPRECATED

    Use :manual_ack instead

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let Bunny generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/bunny/queue.rb', line 197

def subscribe(opts = {
                :consumer_tag    => @channel.generate_consumer_tag,
                :manual_ack      => false,
                :exclusive       => false,
                :block           => false,
                :on_cancellation => nil
              }, &block)

  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  ctag       = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
  consumer   = Consumer.new(@channel,
                            self,
                            ctag,
                            !opts[:manual_ack],
                            opts[:exclusive],
                            opts[:arguments])

  consumer.on_delivery(&block)
  consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation]

  @channel.basic_consume_with(consumer)
  if opts[:block]
    # joins current thread with the consumers pool, will block
    # the current thread for as long as the consumer pool is active
    @channel.work_pool.join
  end

  consumer
end

#subscribe_with(consumer, opts = {:block => false}) ⇒ Object

Adds a consumer object to the queue (subscribes for message deliveries).

Parameters:

  • consumer (Bunny::Consumer)

    a Consumer subclass that implements consumer interface

  • opts (Hash) (defaults to: {:block => false})

    Options

Options Hash (opts):

  • block (Boolean) — default: false

    Should the call block calling thread?

See Also:



240
241
242
243
244
245
# File 'lib/bunny/queue.rb', line 240

def subscribe_with(consumer, opts = {:block => false})
  @channel.basic_consume_with(consumer)

  @channel.work_pool.join if opts[:block]
  consumer
end

#to_sObject



120
121
122
123
# File 'lib/bunny/queue.rb', line 120

def to_s
  oid = ("0x%x" % (self.object_id << 1))
  "<#{self.class.name}:#{oid} @name=\"#{name}\" channel=#{@channel.to_s} @durable=#{@durable} @auto_delete=#{@auto_delete} @exclusive=#{@exclusive} @arguments=#{@arguments}>"
end

#unbind(exchange, opts = {}) ⇒ Object

Unbinds queue from an exchange

Parameters:

  • exchange (Bunny::Exchange, String)

    Exchange to unbind from

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/bunny/queue.rb', line 169

def unbind(exchange, opts = {})
  @channel.queue_unbind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] }

  self
end