Class: MarchHare::Queue
- Inherits:
-
Object
- Object
- MarchHare::Queue
- Defined in:
- lib/march_hare/queue.rb
Overview
Represents AMQP 0.9.1 queue.
Defined Under Namespace
Instance Attribute Summary collapse
-
#channel ⇒ MarchHare::Channel
readonly
Channel this queue uses.
-
#name ⇒ String
readonly
Queue name.
Class Method Summary collapse
-
.verify_type!(args0 = {}) ⇒ Object
Implementation.
Instance Method Summary collapse
-
#arguments ⇒ Hash
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
#auto_delete? ⇒ Boolean
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange.
- #build_consumer(opts = {}, &block) ⇒ Object
-
#consumer_count ⇒ Integer
How many active consumers the queue has.
- #declare! ⇒ Object
-
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue.
-
#durable? ⇒ Boolean
True if this queue was declared as durable (will survive broker restart).
-
#exclusive? ⇒ Boolean
True if this queue was declared as exclusive (limited to just one consumer).
- #get(options = {}) ⇒ Object (also: #pop)
-
#initialize(channel, name, options = {}) ⇒ Queue
constructor
A new instance of Queue.
-
#message_count ⇒ Integer
How many messages the queue has ready (e.g. not delivered but not unacknowledged).
-
#predefined? ⇒ Boolean
True if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange.
-
#purge ⇒ Object
Purges a queue (removes all messages from it).
- #recover_bindings ⇒ Object
- #recover_from_network_failure ⇒ Object
-
#server_named? ⇒ Boolean
True if this queue was declared as server named.
-
#status ⇒ Array<Integer>
A pair with information about the number of queue messages and consumers.
-
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
- #subscribe_with(consumer, opts = {}) ⇒ Object
-
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange.
- #verify_type!(args) ⇒ Object
Constructor Details
#initialize(channel, name, options = {}) ⇒ Queue
Returns a new instance of Queue.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/march_hare/queue.rb', line 52 def initialize(channel, name, ={}) raise ArgumentError, 'queue name must be a string' unless name.is_a? String @channel = channel @name = name @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new}.merge() args = @options[:arguments] || {} @type = @options.fetch(:type, args.fetch(XArgs::QUEUE_TYPE, Types::CLASSIC)).to_s @durable = if @type == Types::QUORUM or @type == Types::STREAM true else @options[:durable] end @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = if @type and !@type.empty? then args = @options[:arguments] || {} {XArgs::QUEUE_TYPE => @type}.merge(args) else @options[:arguments] end verify_type!(@arguments) @bindings = Set.new end |
Instance Attribute Details
#channel ⇒ MarchHare::Channel (readonly)
Returns Channel this queue uses.
36 37 38 |
# File 'lib/march_hare/queue.rb', line 36 def channel @channel end |
#name ⇒ String (readonly)
Returns Queue name.
38 39 40 |
# File 'lib/march_hare/queue.rb', line 38 def name @name end |
Class Method Details
.verify_type!(args0 = {}) ⇒ Object
Implementation
261 262 263 264 265 266 267 |
# File 'lib/march_hare/queue.rb', line 261 def self.verify_type!(args0 = {}) # be extra defensive args = args0 || {} q_type = args["x-queue-type"] || args[:"x-queue-type"] throw ArgumentError.new( "unsupported queue type #{q_type.inspect}, supported ones: #{Types::KNOWN.join(', ')}") if (q_type and !Types.known?(q_type)) end |
Instance Method Details
#arguments ⇒ Hash
Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).
108 109 110 |
# File 'lib/march_hare/queue.rb', line 108 def arguments @arguments end |
#auto_delete? ⇒ Boolean
Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
97 98 99 |
# File 'lib/march_hare/queue.rb', line 97 def auto_delete? @auto_delete end |
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/march_hare/queue.rb', line 122 def bind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_bind(@name, exchange_name, ([:routing_key] || [:key] || ""), [:arguments]) # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key]), :arguments => [:arguments] } @bindings << binding unless @bindings.include?(binding) self end |
#build_consumer(opts = {}, &block) ⇒ Object
190 191 192 193 194 195 196 |
# File 'lib/march_hare/queue.rb', line 190 def build_consumer(opts = {}, &block) if opts[:block] || opts[:blocking] BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block) else CallbackConsumer.new(@channel, self, opts, block) end end |
#consumer_count ⇒ Integer
Returns How many active consumers the queue has.
240 241 242 243 |
# File 'lib/march_hare/queue.rb', line 240 def consumer_count response = @channel.queue_declare_passive(@name) response.consumer_count end |
#declare! ⇒ Object
279 280 281 282 283 284 285 286 |
# File 'lib/march_hare/queue.rb', line 279 def declare! @channel.logger.debug("queue: declare! #{@name}, type: #{@type}") response = if @options[:passive] then @channel.queue_declare_passive(@name) else @channel.queue_declare(@name, @durable, @exclusive, @auto_delete, @arguments) end @name = response.queue end |
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue
168 169 170 |
# File 'lib/march_hare/queue.rb', line 168 def delete(if_unused = false, if_empty = false) @channel.queue_delete(@name, if_unused, if_empty) end |
#durable? ⇒ Boolean
Returns true if this queue was declared as durable (will survive broker restart).
85 86 87 |
# File 'lib/march_hare/queue.rb', line 85 def durable? @durable end |
#exclusive? ⇒ Boolean
Returns true if this queue was declared as exclusive (limited to just one consumer).
91 92 93 |
# File 'lib/march_hare/queue.rb', line 91 def exclusive? @exclusive end |
#get(options = {}) ⇒ Object Also known as: pop
179 180 181 182 183 184 185 186 187 |
# File 'lib/march_hare/queue.rb', line 179 def get( = {}) response = @channel.basic_get(@name, !.fetch(:ack, false)) if response [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)] else nil end end |
#message_count ⇒ Integer
Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).
234 235 236 237 |
# File 'lib/march_hare/queue.rb', line 234 def response = @channel.queue_declare_passive(@name) response. end |
#predefined? ⇒ Boolean
Returns true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
274 275 276 |
# File 'lib/march_hare/queue.rb', line 274 def predefined? @name.start_with?("amq.") end |
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish
250 251 252 253 254 |
# File 'lib/march_hare/queue.rb', line 250 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end |
#purge ⇒ Object
Purges a queue (removes all messages from it)
175 176 177 |
# File 'lib/march_hare/queue.rb', line 175 def purge @channel.queue_purge(@name) end |
#recover_bindings ⇒ Object
304 305 306 307 308 309 |
# File 'lib/march_hare/queue.rb', line 304 def recover_bindings @bindings.each do |b| @channel.logger.debug("Recovering binding #{b.inspect}") self.bind(b[:exchange], b) end end |
#recover_from_network_failure ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/march_hare/queue.rb', line 289 def recover_from_network_failure if self.server_named? old_name = @name.dup @name = "" @channel.deregister_queue_named(old_name) end declare! if !predefined? @channel.register_queue(self) recover_bindings end |
#server_named? ⇒ Boolean
Returns true if this queue was declared as server named.
103 104 105 |
# File 'lib/march_hare/queue.rb', line 103 def server_named? @server_named end |
#status ⇒ Array<Integer>
Returns A pair with information about the number of queue messages and consumers.
228 229 230 231 |
# File 'lib/march_hare/queue.rb', line 228 def status response = @channel.queue_declare_passive(@name) [response., response.consumer_count] end |
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
210 211 212 |
# File 'lib/march_hare/queue.rb', line 210 def subscribe(opts = {}, &block) subscribe_with(build_consumer(opts, &block), opts) end |
#subscribe_with(consumer, opts = {}) ⇒ Object
214 215 216 217 218 219 220 221 222 223 |
# File 'lib/march_hare/queue.rb', line 214 def subscribe_with(consumer, opts = {}) @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer) consumer.consumer_tag = @consumer_tag @default_consumer = consumer @channel.register_consumer(@consumer_tag, consumer) consumer.start consumer end |
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/march_hare/queue.rb', line 148 def unbind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_unbind(@name, exchange_name, .fetch(:routing_key, '')) binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key] || ""), :arguments => [:arguments] } @bindings.delete(binding) unless @bindings.include?(binding) self end |
#verify_type!(args) ⇒ Object
269 270 271 |
# File 'lib/march_hare/queue.rb', line 269 def verify_type!(args) self.class.verify_type!(args) end |