Class: MarchHare::Queue
- Inherits:
-
Object
- Object
- MarchHare::Queue
- Defined in:
- lib/march_hare/queue.rb
Overview
Represents AMQP 0.9.1 queue.
Defined Under Namespace
Instance Attribute Summary collapse
-
#channel ⇒ MarchHare::Channel
readonly
Channel this queue uses.
-
#name ⇒ String
readonly
Queue name.
Class Method Summary collapse
-
.verify_type!(args0 = {}) ⇒ Object
Implementation.
Instance Method Summary collapse
-
#arguments ⇒ Hash
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
#auto_delete? ⇒ Boolean
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange.
- #build_consumer(opts = {}, &block) ⇒ Object
-
#consumer_count ⇒ Integer
How many active consumers the queue has.
- #declare! ⇒ Object
-
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue.
-
#durable? ⇒ Boolean
True if this queue was declared as durable (will survive broker restart).
-
#exclusive? ⇒ Boolean
True if this queue was declared as exclusive (limited to just one consumer).
- #get(options = {}) ⇒ Object (also: #pop)
-
#initialize(channel, name, options = {}) ⇒ Queue
constructor
A new instance of Queue.
-
#message_count ⇒ Integer
How many messages the queue has ready (e.g. not delivered but not unacknowledged).
-
#predefined? ⇒ Boolean
True if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange.
-
#purge ⇒ Object
Purges a queue (removes all messages from it).
- #recover_bindings ⇒ Object
- #recover_from_network_failure ⇒ Object
-
#server_named? ⇒ Boolean
True if this queue was declared as server named.
-
#status ⇒ Array<Integer>
A pair with information about the number of queue messages and consumers.
-
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
- #subscribe_with(consumer, opts = {}) ⇒ Object
-
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange.
- #verify_type!(args) ⇒ Object
Constructor Details
#initialize(channel, name, options = {}) ⇒ Queue
Returns a new instance of Queue.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/march_hare/queue.rb', line 52 def initialize(channel, name, ={}) raise ArgumentError, 'queue name must be a string' unless name.is_a? String @channel = channel @name = name @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new, :type => Types::CLASSIC}.merge() @type = @options[:type].to_s @durable = @options[:durable] @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = if @type and !@type.empty? then (@options[:arguments] || {}).merge({XArgs::QUEUE_TYPE => @type}) else @options[:arguments] end verify_type!(@arguments) @bindings = Set.new end |
Instance Attribute Details
#channel ⇒ MarchHare::Channel (readonly)
Returns Channel this queue uses.
36 37 38 |
# File 'lib/march_hare/queue.rb', line 36 def channel @channel end |
#name ⇒ String (readonly)
Returns Queue name.
38 39 40 |
# File 'lib/march_hare/queue.rb', line 38 def name @name end |
Class Method Details
.verify_type!(args0 = {}) ⇒ Object
Implementation
254 255 256 257 258 259 260 |
# File 'lib/march_hare/queue.rb', line 254 def self.verify_type!(args0 = {}) # be extra defensive args = args0 || {} q_type = args["x-queue-type"] || args[:"x-queue-type"] throw ArgumentError.new( "unsupported queue type #{q_type.inspect}, supported ones: #{Types::KNOWN.join(', ')}") if (q_type and !Types.known?(q_type)) end |
Instance Method Details
#arguments ⇒ Hash
Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).
101 102 103 |
# File 'lib/march_hare/queue.rb', line 101 def arguments @arguments end |
#auto_delete? ⇒ Boolean
Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
90 91 92 |
# File 'lib/march_hare/queue.rb', line 90 def auto_delete? @auto_delete end |
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/march_hare/queue.rb', line 115 def bind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_bind(@name, exchange_name, ([:routing_key] || [:key] || ""), [:arguments]) # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key]), :arguments => [:arguments] } @bindings << binding unless @bindings.include?(binding) self end |
#build_consumer(opts = {}, &block) ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/march_hare/queue.rb', line 183 def build_consumer(opts = {}, &block) if opts[:block] || opts[:blocking] BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block) else CallbackConsumer.new(@channel, self, opts, block) end end |
#consumer_count ⇒ Integer
Returns How many active consumers the queue has.
233 234 235 236 |
# File 'lib/march_hare/queue.rb', line 233 def consumer_count response = @channel.queue_declare_passive(@name) response.consumer_count end |
#declare! ⇒ Object
272 273 274 275 276 277 278 279 |
# File 'lib/march_hare/queue.rb', line 272 def declare! @channel.logger.debug("queue: declare! #{@name}, type: #{@type}") response = if @options[:passive] then @channel.queue_declare_passive(@name) else @channel.queue_declare(@name, @durable, @exclusive, @auto_delete, @arguments) end @name = response.queue end |
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue
161 162 163 |
# File 'lib/march_hare/queue.rb', line 161 def delete(if_unused = false, if_empty = false) @channel.queue_delete(@name, if_unused, if_empty) end |
#durable? ⇒ Boolean
Returns true if this queue was declared as durable (will survive broker restart).
78 79 80 |
# File 'lib/march_hare/queue.rb', line 78 def durable? @durable end |
#exclusive? ⇒ Boolean
Returns true if this queue was declared as exclusive (limited to just one consumer).
84 85 86 |
# File 'lib/march_hare/queue.rb', line 84 def exclusive? @exclusive end |
#get(options = {}) ⇒ Object Also known as: pop
172 173 174 175 176 177 178 179 180 |
# File 'lib/march_hare/queue.rb', line 172 def get( = {}) response = @channel.basic_get(@name, !.fetch(:ack, false)) if response [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)] else nil end end |
#message_count ⇒ Integer
Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).
227 228 229 230 |
# File 'lib/march_hare/queue.rb', line 227 def response = @channel.queue_declare_passive(@name) response. end |
#predefined? ⇒ Boolean
Returns true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
267 268 269 |
# File 'lib/march_hare/queue.rb', line 267 def predefined? @name.start_with?("amq.") end |
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish
243 244 245 246 247 |
# File 'lib/march_hare/queue.rb', line 243 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end |
#purge ⇒ Object
Purges a queue (removes all messages from it)
168 169 170 |
# File 'lib/march_hare/queue.rb', line 168 def purge @channel.queue_purge(@name) end |
#recover_bindings ⇒ Object
297 298 299 300 301 302 |
# File 'lib/march_hare/queue.rb', line 297 def recover_bindings @bindings.each do |b| @channel.logger.debug("Recovering binding #{b.inspect}") self.bind(b[:exchange], b) end end |
#recover_from_network_failure ⇒ Object
282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/march_hare/queue.rb', line 282 def recover_from_network_failure if self.server_named? old_name = @name.dup @name = "" @channel.deregister_queue_named(old_name) end declare! if !predefined? @channel.register_queue(self) recover_bindings end |
#server_named? ⇒ Boolean
Returns true if this queue was declared as server named.
96 97 98 |
# File 'lib/march_hare/queue.rb', line 96 def server_named? @server_named end |
#status ⇒ Array<Integer>
Returns A pair with information about the number of queue messages and consumers.
221 222 223 224 |
# File 'lib/march_hare/queue.rb', line 221 def status response = @channel.queue_declare_passive(@name) [response., response.consumer_count] end |
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
203 204 205 |
# File 'lib/march_hare/queue.rb', line 203 def subscribe(opts = {}, &block) subscribe_with(build_consumer(opts, &block), opts) end |
#subscribe_with(consumer, opts = {}) ⇒ Object
207 208 209 210 211 212 213 214 215 216 |
# File 'lib/march_hare/queue.rb', line 207 def subscribe_with(consumer, opts = {}) @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer) consumer.consumer_tag = @consumer_tag @default_consumer = consumer @channel.register_consumer(@consumer_tag, consumer) consumer.start consumer end |
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange
141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/march_hare/queue.rb', line 141 def unbind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_unbind(@name, exchange_name, .fetch(:routing_key, '')) binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key] || ""), :arguments => [:arguments] } @bindings.delete(binding) unless @bindings.include?(binding) self end |
#verify_type!(args) ⇒ Object
262 263 264 |
# File 'lib/march_hare/queue.rb', line 262 def verify_type!(args) self.class.verify_type!(args) end |