Class: Brown::Receiver
Instance Method Summary
collapse
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
#backtrace, #log_level, #logger
Methods included from AmqpErrors
#error_lookup, #error_message, #errors
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
Returns a new instance of Receiver.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/brown/receiver.rb', line 10
def initialize(queue_def, opts={}, &blk)
@queue_def = queue_def.is_a?(Brown::QueueDefinition) ? queue_def : Brown::QueueDefinition.new(queue_def, opts)
@acl_type_cache = Brown::ACLLookup
@options = opts
@requeue_options = {}
@requeue_queue = Brown::Sender.new(@queue_def, opts)
@payload_type = Array(option_or_default(@queue_def.options, :type, []))
prefetch = option_or_default(@queue_def.options, :prefetch, 1)
@channel_completion = EM::Completion.new
@queue_completion = EM::Completion.new
open_channel(:prefetch => prefetch) do |channel|
logger.debug { "channel open for receiver on #{@queue_def.denormalise}" }
channel.on_error do |ch, close|
logger.fatal { "Channel error: #{close.inspect}" }
end
channel.queue(@queue_def.normalise) do |queue|
logger.debug { "Registered queue #{@queue_def.denormalise} on channel" }
@queue_completion.succeed(queue)
end
@channel_completion.succeed(channel)
end
blk.call(self) if blk
end
|
Instance Method Details
#ack(multiple = false) ⇒ Object
44
45
46
|
# File 'lib/brown/receiver.rb', line 44
def ack(multiple=false)
@channel_completion.completion {|channel| channel.ack(multiple) }
end
|
#delete(&blk) ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/brown/receiver.rb', line 110
def delete(&blk)
@queue_completion.completion do |queue|
@channel_completion.completion do |channel|
queue.unbind(exchange) do
queue.delete do
exchange.delete do
channel.close(&blk)
end
end
end
end
end
end
|
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
99
100
101
102
103
104
|
# File 'lib/brown/receiver.rb', line 99
def on_error(chain=false, &blk)
@channel_completion.completion do |channel|
channel.on_error(&blk)
end
end
|
#on_requeue(&blk) ⇒ Object
75
76
77
|
# File 'lib/brown/receiver.rb', line 75
def on_requeue(&blk)
@requeue_options[:on_requeue] = blk
end
|
#on_requeue_limit(&blk) ⇒ Object
79
80
81
|
# File 'lib/brown/receiver.rb', line 79
def on_requeue_limit(&blk)
@requeue_options[:on_requeue_limit] = blk
end
|
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/brown/receiver.rb', line 86
def pop(&blk)
@queue_completion.completion do |queue|
queue.pop({}) do |metadata, payload|
if payload
on_message(metadata, payload, &blk)
else
blk.call(nil,nil)
end
end
end
end
|
#queue_name ⇒ Object
106
107
108
|
# File 'lib/brown/receiver.rb', line 106
def queue_name
@queue_def.denormalise
end
|
#requeue_parameters(opts) ⇒ Object
71
72
73
|
# File 'lib/brown/receiver.rb', line 71
def requeue_parameters(opts)
@requeue_options.merge!(opts)
end
|
#status(&blk) ⇒ Object
124
125
126
127
128
129
130
|
# File 'lib/brown/receiver.rb', line 124
def status(&blk)
@queue_completion.completion do |queue|
queue.status do |num_messages, num_consumers|
blk.call(num_messages, num_consumers)
end
end
end
|
#subscribe(opts = {}, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/brown/receiver.rb', line 51
def subscribe(opts = {}, &blk)
@queue_completion.completion do |queue|
logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{@queue_def.options}" }
queue.subscribe(opts.merge(:ack => true)) do |metadata,payload|
logger.debug { "Received a message on #{@queue_def.denormalise}: #{metadata.to_hash.inspect}" }
if payload
on_message(metadata, payload, &blk)
else
logger.debug { "Received null message on: #{@queue_def.denormalise} [options]:#{@queue_def.options}" }
end
end
end
end
|
#unsubscribe(&blk) ⇒ Object
65
66
67
68
69
|
# File 'lib/brown/receiver.rb', line 65
def unsubscribe(&blk)
@queue_completion.completion do |queue|
queue.unsubscribe(&blk)
end
end
|