Class: MQ::Queue
Constant Summary
Constants included from AMQP
AMQP::DIR, AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #bind(exchange, opts = {}) ⇒ Object
-
#initialize(mq, name, opts = {}) ⇒ Queue
constructor
A new instance of Queue.
- #publish(data, opts = {}) ⇒ Object
- #receive(headers, body) ⇒ Object
- #subscribe(opts = {}, &blk) ⇒ Object
Methods included from AMQP
client, client=, connect, settings, start, stop
Constructor Details
#initialize(mq, name, opts = {}) ⇒ Queue
Returns a new instance of Queue.
5 6 7 8 9 10 11 12 |
# File 'lib/mq/queue.rb', line 5 def initialize mq, name, opts = {} @mq = mq @mq.queues[@name = name] ||= self @mq.callback{ @mq.send Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts)) } end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
13 14 15 |
# File 'lib/mq/queue.rb', line 13 def name @name end |
Instance Method Details
#bind(exchange, opts = {}) ⇒ Object
15 16 17 18 19 20 21 22 23 |
# File 'lib/mq/queue.rb', line 15 def bind exchange, opts = {} @mq.callback{ @mq.send Protocol::Queue::Bind.new({ :queue => name, :exchange => exchange.respond_to?(:name) ? exchange.name : exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)) } self end |
#publish(data, opts = {}) ⇒ Object
36 37 38 |
# File 'lib/mq/queue.rb', line 36 def publish data, opts = {} exchange.publish(data, opts) end |
#receive(headers, body) ⇒ Object
40 41 42 43 44 |
# File 'lib/mq/queue.rb', line 40 def receive headers, body if @on_msg @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body]) end end |
#subscribe(opts = {}, &blk) ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/mq/queue.rb', line 25 def subscribe opts = {}, &blk @on_msg = blk @mq.callback{ @mq.send Protocol::Basic::Consume.new({ :queue => name, :consumer_tag => name, :no_ack => true, :nowait => true }.merge(opts)) } self end |