Class: MarchHare::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/queue.rb

Overview

Represents AMQP 0.9.1 queue.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name, options = {}) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • channel_or_connection (MarchHare::Channel)

    Channel this queue will use.

  • name (String)

    Queue name. Pass an empty string to make RabbitMQ generate a unique one.

  • opts (Hash)

    Queue properties

Raises:

  • (ArgumentError)

See Also:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/march_hare/queue.rb', line 31

def initialize(channel, name, options={})
  raise ArgumentError, 'queue name must be a string' unless name.is_a? String

  @channel = channel
  @name = name
  @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new}.merge(options)

  @durable      = @options[:durable]
  @exclusive    = @options[:exclusive]
  @server_named = @name.empty?
  @auto_delete  = @options[:auto_delete]
  @arguments    = @options[:arguments]

  @bindings     = Set.new
end

Instance Attribute Details

#channelMarchHare::Channel (readonly)

Returns Channel this queue uses.

Returns:



15
16
17
# File 'lib/march_hare/queue.rb', line 15

def channel
  @channel
end

#nameString (readonly)

Returns Queue name.

Returns:

  • (String)

    Queue name



17
18
19
# File 'lib/march_hare/queue.rb', line 17

def name
  @name
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



73
74
75
# File 'lib/march_hare/queue.rb', line 73

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

See Also:



62
63
64
# File 'lib/march_hare/queue.rb', line 62

def auto_delete?
  @auto_delete
end

#bind(exchange, options = {}) ⇒ Object

Binds queue to an exchange

Parameters:

  • exchange (MarchHare::Exchange, String)

    Exchange to bind to

  • options (Hash) (defaults to: {})

    Binding properties

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/march_hare/queue.rb', line 89

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_bind(@name, exchange_name, (options[:routing_key] || options[:key] || ""), options[:arguments])

  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key]), :arguments => options[:arguments] }
  @bindings << binding unless @bindings.include?(binding)

  self
end

#build_consumer(opts = {}, &block) ⇒ Object



157
158
159
160
161
162
163
# File 'lib/march_hare/queue.rb', line 157

def build_consumer(opts = {}, &block)
  if opts[:block] || opts[:blocking]
    BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block)
  else
    CallbackConsumer.new(@channel, self, opts, block)
  end
end

#consumer_countInteger

Returns How many active consumers the queue has.

Returns:

  • (Integer)

    How many active consumers the queue has



208
209
210
211
# File 'lib/march_hare/queue.rb', line 208

def consumer_count
  response = @channel.queue_declare_passive(@name)
  response.consumer_count
end

#declare!Object



235
236
237
238
239
240
241
# File 'lib/march_hare/queue.rb', line 235

def declare!
  response = if @options[:passive]
             then @channel.queue_declare_passive(@name)
             else @channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], @options[:arguments])
             end
  @name = response.queue
end

#delete(if_unused = false, if_empty = false) ⇒ Object

Deletes the queue

Parameters:

  • [Boolean] (Hash)

    a customizable set of options

See Also:



135
136
137
# File 'lib/march_hare/queue.rb', line 135

def delete(if_unused = false, if_empty = false)
  @channel.queue_delete(@name, if_unused, if_empty)
end

#durable?Boolean

Returns true if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).

See Also:



50
51
52
# File 'lib/march_hare/queue.rb', line 50

def durable?
  @durable
end

#exclusive?Boolean

Returns true if this queue was declared as exclusive (limited to just one consumer).

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)

See Also:



56
57
58
# File 'lib/march_hare/queue.rb', line 56

def exclusive?
  @exclusive
end

#get(options = {:block => false}) ⇒ Object Also known as: pop



146
147
148
149
150
151
152
153
154
# File 'lib/march_hare/queue.rb', line 146

def get(options = {:block => false})
  response = @channel.basic_get(@name, !options.fetch(:ack, false))

  if response
    [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)]
  else
    nil
  end
end

#message_countInteger

Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).

Returns:

  • (Integer)

    How many messages the queue has ready (e.g. not delivered but not unacknowledged)



202
203
204
205
# File 'lib/march_hare/queue.rb', line 202

def message_count
  response = @channel.queue_declare_passive(@name)
  response.message_count
end

#predefined?Boolean

Returns true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



230
231
232
# File 'lib/march_hare/queue.rb', line 230

def predefined?
  @name.start_with?("amq.")
end

#publish(payload, opts = {}) ⇒ Object

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



218
219
220
221
222
# File 'lib/march_hare/queue.rb', line 218

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

#purgeObject

Purges a queue (removes all messages from it)



142
143
144
# File 'lib/march_hare/queue.rb', line 142

def purge
  @channel.queue_purge(@name)
end

#recover_bindingsObject



259
260
261
262
263
264
265
# File 'lib/march_hare/queue.rb', line 259

def recover_bindings
  @bindings.each do |b|
    # TODO: use a logger
    # puts "Recovering binding #{b.inspect}"
    self.bind(b[:exchange], b)
  end
end

#recover_from_network_failureObject



244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/march_hare/queue.rb', line 244

def recover_from_network_failure
  if self.server_named?
    old_name = @name.dup
    @name    = ""

    @channel.deregister_queue_named(old_name)
  end

  declare! if !predefined?

  @channel.register_queue(self)
  recover_bindings
end

#server_named?Boolean

Returns true if this queue was declared as server named.

Returns:

  • (Boolean)

    true if this queue was declared as server named.

See Also:



68
69
70
# File 'lib/march_hare/queue.rb', line 68

def server_named?
  @server_named
end

#statusArray<Integer>

Returns A pair with information about the number of queue messages and consumers.

Returns:

  • (Array<Integer>)

    A pair with information about the number of queue messages and consumers

See Also:



196
197
198
199
# File 'lib/march_hare/queue.rb', line 196

def status
  response = @channel.queue_declare_passive(@name)
  [response.message_count, response.consumer_count]
end

#subscribe(opts = {}, &block) ⇒ Object

Adds a consumer to the queue (subscribes for message deliveries).

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let MarchHare generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



178
179
180
# File 'lib/march_hare/queue.rb', line 178

def subscribe(opts = {}, &block)
  subscribe_with(build_consumer(opts, &block), opts)
end

#subscribe_with(consumer, opts = {}) ⇒ Object



182
183
184
185
186
187
188
189
190
191
# File 'lib/march_hare/queue.rb', line 182

def subscribe_with(consumer, opts = {})
  @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer)
  consumer.consumer_tag = @consumer_tag

  @default_consumer = consumer
  @channel.register_consumer(@consumer_tag, consumer)

  consumer.start
  consumer
end

#unbind(exchange, options = {}) ⇒ Object

Unbinds queue from an exchange

Parameters:

  • exchange (MarchHare::Exchange, String)

    Exchange to unbind from

  • options (Hash) (defaults to: {})

    Binding properties

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/march_hare/queue.rb', line 115

def unbind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_unbind(@name, exchange_name, options.fetch(:routing_key, ''))

  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key] || ""), :arguments => options[:arguments] }
  @bindings.delete(binding) unless @bindings.include?(binding)

  self
end