Class: MarchHare::Queue
- Inherits:
-
Object
- Object
- MarchHare::Queue
- Defined in:
- lib/march_hare/queue.rb
Overview
Represents AMQP 0.9.1 queue.
Instance Attribute Summary collapse
-
#channel ⇒ MarchHare::Channel
readonly
Channel this queue uses.
-
#name ⇒ String
readonly
Queue name.
Instance Method Summary collapse
-
#arguments ⇒ Hash
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
#auto_delete? ⇒ Boolean
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange.
- #build_consumer(opts = {}, &block) ⇒ Object
-
#consumer_count ⇒ Integer
How many active consumers the queue has.
- #declare! ⇒ Object
-
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue.
-
#durable? ⇒ Boolean
True if this queue was declared as durable (will survive broker restart).
-
#exclusive? ⇒ Boolean
True if this queue was declared as exclusive (limited to just one consumer).
- #get(options = {:block => false}) ⇒ Object (also: #pop)
-
#initialize(channel, name, options = {}) ⇒ Queue
constructor
A new instance of Queue.
-
#message_count ⇒ Integer
How many messages the queue has ready (e.g. not delivered but not unacknowledged).
-
#predefined? ⇒ Boolean
True if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange.
-
#purge ⇒ Object
Purges a queue (removes all messages from it).
- #recover_bindings ⇒ Object
- #recover_from_network_failure ⇒ Object
-
#server_named? ⇒ Boolean
True if this queue was declared as server named.
-
#status ⇒ Array<Integer>
A pair with information about the number of queue messages and consumers.
-
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
- #subscribe_with(consumer, opts = {}) ⇒ Object
-
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange.
Constructor Details
#initialize(channel, name, options = {}) ⇒ Queue
Returns a new instance of Queue.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/march_hare/queue.rb', line 31 def initialize(channel, name, ={}) raise ArgumentError, 'queue name must be a string' unless name.is_a? String @channel = channel @name = name @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new}.merge() @durable = @options[:durable] @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = @options[:arguments] @bindings = Set.new end |
Instance Attribute Details
#channel ⇒ MarchHare::Channel (readonly)
Returns Channel this queue uses.
15 16 17 |
# File 'lib/march_hare/queue.rb', line 15 def channel @channel end |
#name ⇒ String (readonly)
Returns Queue name.
17 18 19 |
# File 'lib/march_hare/queue.rb', line 17 def name @name end |
Instance Method Details
#arguments ⇒ Hash
Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).
73 74 75 |
# File 'lib/march_hare/queue.rb', line 73 def arguments @arguments end |
#auto_delete? ⇒ Boolean
Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
62 63 64 |
# File 'lib/march_hare/queue.rb', line 62 def auto_delete? @auto_delete end |
#bind(exchange, options = {}) ⇒ Object
Binds queue to an exchange
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/march_hare/queue.rb', line 89 def bind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_bind(@name, exchange_name, ([:routing_key] || [:key] || ""), [:arguments]) # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key]), :arguments => [:arguments] } @bindings << binding unless @bindings.include?(binding) self end |
#build_consumer(opts = {}, &block) ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/march_hare/queue.rb', line 157 def build_consumer(opts = {}, &block) if opts[:block] || opts[:blocking] BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block) else CallbackConsumer.new(@channel, self, opts, block) end end |
#consumer_count ⇒ Integer
Returns How many active consumers the queue has.
208 209 210 211 |
# File 'lib/march_hare/queue.rb', line 208 def consumer_count response = @channel.queue_declare_passive(@name) response.consumer_count end |
#declare! ⇒ Object
235 236 237 238 239 240 241 |
# File 'lib/march_hare/queue.rb', line 235 def declare! response = if @options[:passive] then @channel.queue_declare_passive(@name) else @channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], @options[:arguments]) end @name = response.queue end |
#delete(if_unused = false, if_empty = false) ⇒ Object
Deletes the queue
135 136 137 |
# File 'lib/march_hare/queue.rb', line 135 def delete(if_unused = false, if_empty = false) @channel.queue_delete(@name, if_unused, if_empty) end |
#durable? ⇒ Boolean
Returns true if this queue was declared as durable (will survive broker restart).
50 51 52 |
# File 'lib/march_hare/queue.rb', line 50 def durable? @durable end |
#exclusive? ⇒ Boolean
Returns true if this queue was declared as exclusive (limited to just one consumer).
56 57 58 |
# File 'lib/march_hare/queue.rb', line 56 def exclusive? @exclusive end |
#get(options = {:block => false}) ⇒ Object Also known as: pop
146 147 148 149 150 151 152 153 154 |
# File 'lib/march_hare/queue.rb', line 146 def get( = {:block => false}) response = @channel.basic_get(@name, !.fetch(:ack, false)) if response [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)] else nil end end |
#message_count ⇒ Integer
Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).
202 203 204 205 |
# File 'lib/march_hare/queue.rb', line 202 def response = @channel.queue_declare_passive(@name) response. end |
#predefined? ⇒ Boolean
Returns true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
230 231 232 |
# File 'lib/march_hare/queue.rb', line 230 def predefined? @name.start_with?("amq.") end |
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish
218 219 220 221 222 |
# File 'lib/march_hare/queue.rb', line 218 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end |
#purge ⇒ Object
Purges a queue (removes all messages from it)
142 143 144 |
# File 'lib/march_hare/queue.rb', line 142 def purge @channel.queue_purge(@name) end |
#recover_bindings ⇒ Object
259 260 261 262 263 264 265 |
# File 'lib/march_hare/queue.rb', line 259 def recover_bindings @bindings.each do |b| # TODO: use a logger # puts "Recovering binding #{b.inspect}" self.bind(b[:exchange], b) end end |
#recover_from_network_failure ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/march_hare/queue.rb', line 244 def recover_from_network_failure if self.server_named? old_name = @name.dup @name = "" @channel.deregister_queue_named(old_name) end declare! if !predefined? @channel.register_queue(self) recover_bindings end |
#server_named? ⇒ Boolean
Returns true if this queue was declared as server named.
68 69 70 |
# File 'lib/march_hare/queue.rb', line 68 def server_named? @server_named end |
#status ⇒ Array<Integer>
Returns A pair with information about the number of queue messages and consumers.
196 197 198 199 |
# File 'lib/march_hare/queue.rb', line 196 def status response = @channel.queue_declare_passive(@name) [response., response.consumer_count] end |
#subscribe(opts = {}, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
178 179 180 |
# File 'lib/march_hare/queue.rb', line 178 def subscribe(opts = {}, &block) subscribe_with(build_consumer(opts, &block), opts) end |
#subscribe_with(consumer, opts = {}) ⇒ Object
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/march_hare/queue.rb', line 182 def subscribe_with(consumer, opts = {}) @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer) consumer.consumer_tag = @consumer_tag @default_consumer = consumer @channel.register_consumer(@consumer_tag, consumer) consumer.start consumer end |
#unbind(exchange, options = {}) ⇒ Object
Unbinds queue from an exchange
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/march_hare/queue.rb', line 115 def unbind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.queue_unbind(@name, exchange_name, .fetch(:routing_key, '')) binding = { :exchange => exchange_name, :routing_key => ([:routing_key] || [:key] || ""), :arguments => [:arguments] } @bindings.delete(binding) unless @bindings.include?(binding) self end |