Class: MQ::Queue

Inherits:
Object
  • Object
show all
Includes:
AMQP
Defined in:
lib/mq/queue.rb

Constant Summary

Constants included from AMQP

AMQP::DIR, AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AMQP

client, client=, connect, settings, start, stop

Constructor Details

#initialize(mq, name, opts = {}) ⇒ Queue

Returns a new instance of Queue.



5
6
7
8
9
10
11
12
# File 'lib/mq/queue.rb', line 5

def initialize mq, name, opts = {}
  @mq = mq
  @mq.queues[@name = name] ||= self
  @mq.callback{
    @mq.send Protocol::Queue::Declare.new({ :queue => name,
                                            :nowait => true }.merge(opts))
  }
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



13
14
15
# File 'lib/mq/queue.rb', line 13

def name
  @name
end

Instance Method Details

#bind(exchange, opts = {}) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/mq/queue.rb', line 15

def bind exchange, opts = {}
  @mq.callback{
    @mq.send Protocol::Queue::Bind.new({ :queue => name,
                                         :exchange => exchange.respond_to?(:name) ? exchange.name : exchange,
                                         :routing_key => opts.delete(:key),
                                         :nowait => true }.merge(opts))
  }
  self
end

#publish(data, opts = {}) ⇒ Object



36
37
38
# File 'lib/mq/queue.rb', line 36

def publish data, opts = {}
  exchange.publish(data, opts)
end

#receive(headers, body) ⇒ Object



40
41
42
43
44
# File 'lib/mq/queue.rb', line 40

def receive headers, body
  if @on_msg
    @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body])
  end
end

#subscribe(opts = {}, &blk) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/mq/queue.rb', line 25

def subscribe opts = {}, &blk
  @on_msg = blk
  @mq.callback{
    @mq.send Protocol::Basic::Consume.new({ :queue => name,
                                            :consumer_tag => name,
                                            :no_ack => true,
                                            :nowait => true }.merge(opts))
  }
  self
end