Class: Smith::Messaging::Receiver
- Inherits:
-
Object
- Object
- Smith::Messaging::Receiver
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Method Summary collapse
- #ack(multiple = false) ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
constructor
A new instance of Receiver.
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
- #on_requeue(&blk) ⇒ Object
- #on_requeue_limit(&blk) ⇒ Object
-
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block.
- #queue_name ⇒ Object
- #requeue_parameters(opts = {}) ⇒ Object
- #setup_reply_queue(reply_queue_name, &blk) ⇒ Object
- #status(&blk) ⇒ Object
-
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block.
- #unsubscribe(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
Returns a new instance of Receiver.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/smith/messaging/receiver.rb', line 13 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @foo_options = { :auto_ack => option_or_default(@queue_def., :auto_ack, true), :threading => option_or_default(@queue_def., :threading, false)} @payload_type = Array(option_or_default(@queue_def., :type, [])) prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counter = MessageCounter.new(@queue_def.denormalise) @channel_completion = EM::Completion.new @queue_completion = EM::Completion.new @exchange_completion = EM::Completion.new @requeue_options_completion = EM::Completion.new @reply_queues = {} open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) channel.direct(@queue_def.normalise, @options.exchange) do |exchange| @exchange_completion.succeed(exchange) end end open_channel(:prefetch => prefetch) do |channel| channel.queue(@queue_def.normalise, @options.queue) do |queue| @exchange_completion.completion do |exchange| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @requeue_options_completion.succeed(:exchange => exchange, :queue => queue) end end end blk.call(self) if blk end |
Instance Method Details
#ack(multiple = false) ⇒ Object
60 61 62 |
# File 'lib/smith/messaging/receiver.rb', line 60 def ack(multiple=false) @channel_completion.completion {|channel| channel.ack(multiple) } end |
#delete(&blk) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/smith/messaging/receiver.rb', line 162 def delete(&blk) @exchange_completion.completion do |exchange| @queue_completion.completion do |queue| @channel_completion.completion do |channel| queue.unbind(exchange) do queue.delete do exchange.delete do channel.close(&blk) end end end end end end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
132 133 134 135 136 137 |
# File 'lib/smith/messaging/receiver.rb', line 132 def on_error(chain=false, &blk) # TODO Check that this chains callbacks @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_requeue(&blk) ⇒ Object
192 193 194 195 196 |
# File 'lib/smith/messaging/receiver.rb', line 192 def on_requeue(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue => blk) end end |
#on_requeue_limit(&blk) ⇒ Object
198 199 200 201 202 |
# File 'lib/smith/messaging/receiver.rb', line 198 def on_requeue_limit(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue_limit => blk) end end |
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block. pop
will automatically acknowledge the message unless the options sets :ack to false.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/smith/messaging/receiver.rb', line 116 def pop(&blk) opts = @options.pop @queue_completion.completion do |queue| @requeue_options_completion.completion do || queue.pop(opts) do |, payload| if payload (, payload, , &blk) else blk.call(nil,nil) end end end end end |
#queue_name ⇒ Object
158 159 160 |
# File 'lib/smith/messaging/receiver.rb', line 158 def queue_name @queue_def.denormalise end |
#requeue_parameters(opts = {}) ⇒ Object
186 187 188 189 190 |
# File 'lib/smith/messaging/receiver.rb', line 186 def requeue_parameters(opts={}) @requeue_options_completion.completion do || .merge!(opts) end end |
#setup_reply_queue(reply_queue_name, &blk) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/smith/messaging/receiver.rb', line 64 def setup_reply_queue(reply_queue_name, &blk) if @reply_queues[reply_queue_name] blk.call(@reply_queues[reply_queue_name]) else @exchange_completion.completion do |exchange| logger.debug { "Attaching to reply queue: #{reply_queue_name}" } queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false) Smith::Messaging::Sender.new(queue_def) do |sender| @reply_queues[reply_queue_name] = sender blk.call(sender) end end end end |
#status(&blk) ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/smith/messaging/receiver.rb', line 178 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block. subscribe
will automatically acknowledge the message unless the options sets :ack to false.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/smith/messaging/receiver.rb', line 84 def subscribe(handler=nil, &blk) blk = handler || blk @queue_completion.completion do |queue| @requeue_options_completion.completion do || if !queue.subscribed? opts = @options.subscribe logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" } queue.subscribe(opts) do |,payload| if payload (, payload, , &blk) else logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" } end end else logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" } end end end end |
#unsubscribe(&blk) ⇒ Object
107 108 109 110 111 |
# File 'lib/smith/messaging/receiver.rb', line 107 def unsubscribe(&blk) @queue_completion.completion do |queue| queue.unsubscribe(&blk) end end |