Class: Droid::BaseQueue
- Inherits:
-
Object
show all
- Defined in:
- lib/droid/queue.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(queue_name, opts = {}) ⇒ BaseQueue
8
9
10
11
12
|
# File 'lib/droid/queue.rb', line 8
def initialize(queue_name, opts={})
opts[:auto_delete] = true unless opts.has_key?(:auto_delete) and opts[:auto_delete] === false
@queue_name, @opts = queue_name, opts
end
|
Instance Attribute Details
#ex ⇒ Object
Returns the value of attribute ex.
6
7
8
|
# File 'lib/droid/queue.rb', line 6
def ex
@ex
end
|
#mq ⇒ Object
Returns the value of attribute mq.
6
7
8
|
# File 'lib/droid/queue.rb', line 6
def mq
@mq
end
|
#opts ⇒ Object
Returns the value of attribute opts.
5
6
7
|
# File 'lib/droid/queue.rb', line 5
def opts
@opts
end
|
#q ⇒ Object
Returns the value of attribute q.
6
7
8
|
# File 'lib/droid/queue.rb', line 6
def q
@q
end
|
#queue_name ⇒ Object
Returns the value of attribute queue_name.
5
6
7
|
# File 'lib/droid/queue.rb', line 5
def queue_name
@queue_name
end
|
Instance Method Details
#destroy ⇒ Object
66
67
68
|
# File 'lib/droid/queue.rb', line 66
def destroy
teardown
end
|
#log ⇒ Object
25
26
27
|
# File 'lib/droid/queue.rb', line 25
def log
Droid.log
end
|
#setup ⇒ Object
14
15
16
17
18
19
|
# File 'lib/droid/queue.rb', line 14
def setup
@mq = MQ.new
@q = @mq.queue(queue_name, opts)
@ex = @mq.direct(opts[:exchange_name] || queue_name)
end
|
#subscribe(amqp_opts = {}, opts = {}) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/droid/queue.rb', line 34
def subscribe(amqp_opts={}, opts={})
setup
q.bind(ex) if ex
q.subscribe(amqp_opts) do |, message|
Droid::Utilization.monitor(q.name, :temp => temp?) do
request = Droid::Request.new(self, , message)
log.info "amqp_message #{tag} action=received ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
begin
raise Droid::ExpiredMessage if request.expired?
yield request if block_given?
finished = Time.now.getgm.to_i
log.info "amqp_message action=processed #{tag} elapsed=#{finished-request.start} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
rescue Droid::ExpiredMessage
log.info "amqp_message action=timeout #{tag} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
request.ack if amqp_opts[:ack]
rescue => e
request.ack if amqp_opts[:ack]
Droid.handle_error(e)
end
end
end
log.info "amqp_subscribe #{tag}"
self
end
|
#tag ⇒ Object
29
30
31
32
|
# File 'lib/droid/queue.rb', line 29
def tag
s = "queue=#{q.name}"
s += " exchange=#{ex.name}" if ex
end
|
#teardown ⇒ Object
60
61
62
63
64
|
# File 'lib/droid/queue.rb', line 60
def teardown
@q.unsubscribe
@mq.close
log.info "amqp_unsubscribe #{tag}"
end
|
#temp? ⇒ Boolean
21
22
23
|
# File 'lib/droid/queue.rb', line 21
def temp?
false
end
|