Class: Smith::Messaging::Receiver
- Inherits:
-
Object
- Object
- Smith::Messaging::Receiver
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Method Summary collapse
- #ack(multiple = false) ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
constructor
A new instance of Receiver.
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
- #on_requeue(&blk) ⇒ Object
- #on_requeue_limit(&blk) ⇒ Object
-
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block.
- #queue_name ⇒ Object
- #requeue_parameters(opts = {}) ⇒ Object
- #setup_reply_queue(reply_queue_name, &blk) ⇒ Object
- #status(&blk) ⇒ Object
-
#subscribe(&blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block.
- #unsubscribe(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
Returns a new instance of Receiver.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/smith/messaging/receiver.rb', line 13 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @foo_options = { :auto_ack => option_or_default(@queue_def., :auto_ack, true), :threading => option_or_default(@queue_def., :threading, false)} @payload_type = Array(option_or_default(@queue_def., :type, [])) prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counter = MessageCounter.new(@queue_def.denormalise) @channel_completion = EM::Completion.new @queue_completion = EM::Completion.new @exchange_completion = EM::Completion.new @requeue_options_completion = EM::Completion.new @reply_queues = {} open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) channel.direct(@queue_def.normalise, @options.exchange) do |exchange| @exchange_completion.succeed(exchange) end end open_channel(:prefetch => prefetch) do |channel| channel.queue(@queue_def.normalise, @options.queue) do |queue| @exchange_completion.completion do |exchange| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @requeue_options_completion.succeed(:exchange => exchange, :queue => queue) end end end blk.call(self) if blk end |
Instance Method Details
#ack(multiple = false) ⇒ Object
60 61 62 |
# File 'lib/smith/messaging/receiver.rb', line 60 def ack(multiple=false) @channel_completion.completion {|channel| channel.ack(multiple) } end |
#delete(&blk) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/smith/messaging/receiver.rb', line 159 def delete(&blk) @exchange_completion.completion do |exchange| @queue_completion.completion do |queue| @channel_completion.completion do |channel| queue.unbind(exchange) do queue.delete do exchange.delete do channel.close(&blk) end end end end end end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
129 130 131 132 133 134 |
# File 'lib/smith/messaging/receiver.rb', line 129 def on_error(chain=false, &blk) # TODO Check that this chains callbacks @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_requeue(&blk) ⇒ Object
189 190 191 192 193 |
# File 'lib/smith/messaging/receiver.rb', line 189 def on_requeue(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue => blk) end end |
#on_requeue_limit(&blk) ⇒ Object
195 196 197 198 199 |
# File 'lib/smith/messaging/receiver.rb', line 195 def on_requeue_limit(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue_limit => blk) end end |
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block. pop
will automatically acknowledge the message unless the options sets :ack to false.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/smith/messaging/receiver.rb', line 113 def pop(&blk) opts = @options.pop @queue_completion.completion do |queue| @requeue_options_completion.completion do || queue.pop(opts) do |, payload| if payload (, payload, , &blk) else blk.call(nil,nil) end end end end end |
#queue_name ⇒ Object
155 156 157 |
# File 'lib/smith/messaging/receiver.rb', line 155 def queue_name @queue_def.denormalise end |
#requeue_parameters(opts = {}) ⇒ Object
183 184 185 186 187 |
# File 'lib/smith/messaging/receiver.rb', line 183 def requeue_parameters(opts={}) @requeue_options_completion.completion do || .merge!(opts) end end |
#setup_reply_queue(reply_queue_name, &blk) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/smith/messaging/receiver.rb', line 64 def setup_reply_queue(reply_queue_name, &blk) if @reply_queues[reply_queue_name] blk.call(@reply_queues[reply_queue_name]) else @exchange_completion.completion do |exchange| logger.debug { "Attaching to reply queue: #{reply_queue_name}" } queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false) Smith::Messaging::Sender.new(queue_def) do |sender| @reply_queues[reply_queue_name] = sender blk.call(sender) end end end end |
#status(&blk) ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/smith/messaging/receiver.rb', line 175 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |
#subscribe(&blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block. subscribe
will automatically acknowledge the message unless the options sets :ack to false.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/smith/messaging/receiver.rb', line 84 def subscribe(&blk) @queue_completion.completion do |queue| @requeue_options_completion.completion do || if !queue.subscribed? opts = @options.subscribe logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" } queue.subscribe(opts) do |,payload| if payload (, payload, , &blk) else logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" } end end else logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" } end end end end |
#unsubscribe(&blk) ⇒ Object
104 105 106 107 108 |
# File 'lib/smith/messaging/receiver.rb', line 104 def unsubscribe(&blk) @queue_completion.completion do |queue| queue.unsubscribe(&blk) end end |