Class: Smith::Messaging::Receiver
- Inherits:
-
Object
- Object
- Smith::Messaging::Receiver
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Method Summary collapse
- #ack(multiple = false) ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
constructor
A new instance of Receiver.
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
- #on_requeue(&blk) ⇒ Object
- #on_requeue_limit(&blk) ⇒ Object
-
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block.
- #queue_name ⇒ Object
- #requeue_parameters(opts = {}) ⇒ Object
- #setup_reply_queue(reply_queue_name, &blk) ⇒ Object
- #status(&blk) ⇒ Object
-
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block.
- #unsubscribe(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
Returns a new instance of Receiver.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/smith/messaging/receiver.rb', line 13 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance = { :auto_ack => option_or_default(@queue_def., :auto_ack, true), :threading => option_or_default(@queue_def., :threading, false)} = if opts.delete(:fanout) {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)} end @payload_type = Array(option_or_default(@queue_def., :type, [])) prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) = AmqpOptions.new(@queue_def.) .routing_key = @queue_def.normalise = MessageCounter.new(@queue_def.denormalise) @channel_completion = EM::Completion.new @queue_completion = EM::Completion.new @exchange_completion = EM::Completion.new = EM::Completion.new @reply_queues = {} open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) exchange_type = () ? :fanout : :direct channel.send(exchange_type, @queue_def.normalise, .exchange) do |exchange| @exchange_completion.succeed(exchange) end end open_channel(:prefetch => prefetch) do |channel| channel.queue(*fanout_queue_and_opts()) do |queue| @exchange_completion.completion do |exchange| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) .succeed(:exchange => exchange, :queue => queue) end end end blk.call(self) if blk end |
Instance Method Details
#ack(multiple = false) ⇒ Object
65 66 67 |
# File 'lib/smith/messaging/receiver.rb', line 65 def ack(multiple=false) @channel_completion.completion {|channel| channel.ack(multiple) } end |
#delete(&blk) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/smith/messaging/receiver.rb', line 167 def delete(&blk) @exchange_completion.completion do |exchange| @queue_completion.completion do |queue| @channel_completion.completion do |channel| queue.unbind(exchange) do queue.delete do exchange.delete do channel.close(&blk) end end end end end end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
137 138 139 140 141 142 |
# File 'lib/smith/messaging/receiver.rb', line 137 def on_error(chain=false, &blk) # TODO Check that this chains callbacks @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_requeue(&blk) ⇒ Object
197 198 199 200 201 |
# File 'lib/smith/messaging/receiver.rb', line 197 def on_requeue(&blk) .completion do || .merge!(:on_requeue => blk) end end |
#on_requeue_limit(&blk) ⇒ Object
203 204 205 206 207 |
# File 'lib/smith/messaging/receiver.rb', line 203 def on_requeue_limit(&blk) .completion do || .merge!(:on_requeue_limit => blk) end end |
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/smith/messaging/receiver.rb', line 121 def pop(&blk) opts = .pop @queue_completion.completion do |queue| .completion do || queue.pop(opts) do |, payload| if payload (, payload, , &blk) else blk.call(nil,nil) end end end end end |
#queue_name ⇒ Object
163 164 165 |
# File 'lib/smith/messaging/receiver.rb', line 163 def queue_name @queue_def.denormalise end |
#requeue_parameters(opts = {}) ⇒ Object
191 192 193 194 195 |
# File 'lib/smith/messaging/receiver.rb', line 191 def requeue_parameters(opts={}) .completion do || .merge!(opts) end end |
#setup_reply_queue(reply_queue_name, &blk) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/smith/messaging/receiver.rb', line 69 def setup_reply_queue(reply_queue_name, &blk) if @reply_queues[reply_queue_name] blk.call(@reply_queues[reply_queue_name]) else @exchange_completion.completion do |exchange| logger.debug { "Attaching to reply queue: #{reply_queue_name}" } reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false) Smith::Messaging::Sender.new(reply_queue_def) do |sender| @reply_queues[reply_queue_name] = sender blk.call(sender) end end end end |
#status(&blk) ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/smith/messaging/receiver.rb', line 183 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/smith/messaging/receiver.rb', line 89 def subscribe(handler=nil, &blk) blk = handler || blk @queue_completion.completion do |queue| .completion do || if !queue.subscribed? opts = .subscribe logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" } queue.subscribe(opts) do |,payload| if payload (, payload, , &blk) else logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" } end end else logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" } end end end end |
#unsubscribe(&blk) ⇒ Object
112 113 114 115 116 |
# File 'lib/smith/messaging/receiver.rb', line 112 def unsubscribe(&blk) @queue_completion.completion do |queue| queue.unsubscribe(&blk) end end |