Class: Smith::Messaging::Receiver
- Inherits:
-
Object
- Object
- Smith::Messaging::Receiver
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Method Summary collapse
- #ack(multiple = false) ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
constructor
A new instance of Receiver.
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
- #on_requeue(&blk) ⇒ Object
- #on_requeue_limit(&blk) ⇒ Object
-
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block.
- #queue_name ⇒ Object
- #requeue_parameters(opts = {}) ⇒ Object
- #setup_reply_queue(reply_queue_name, &blk) ⇒ Object
- #status(&blk) ⇒ Object
-
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block.
- #unsubscribe(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Receiver
Returns a new instance of Receiver.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/smith/messaging/receiver.rb', line 13 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @foo_options = { :error_queue => opts.delete(:error_queue) { false }, :auto_ack => option_or_default(@queue_def., :auto_ack, true), :threading => option_or_default(@queue_def., :threading, false)} = if opts.delete(:fanout) {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)} end @payload_type = Array(option_or_default(@queue_def., :type, [])) prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counter = MessageCounter.new(@queue_def.denormalise) @channel_completion = EM::Completion.new @queue_completion = EM::Completion.new @exchange_completion = EM::Completion.new @requeue_options_completion = EM::Completion.new @reply_queues = {} open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) exchange_type = () ? :fanout : :direct channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange| @exchange_completion.succeed(exchange) end end open_channel(:prefetch => prefetch) do |channel| channel.queue(*fanout_queue_and_opts()) do |queue| @exchange_completion.completion do |exchange| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @requeue_options_completion.succeed(:exchange => exchange, :queue => queue) end end end blk.call(self) if blk end |
Instance Method Details
#ack(multiple = false) ⇒ Object
66 67 68 |
# File 'lib/smith/messaging/receiver.rb', line 66 def ack(multiple=false) @channel_completion.completion {|channel| channel.ack(multiple) } end |
#delete(&blk) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/smith/messaging/receiver.rb', line 168 def delete(&blk) @exchange_completion.completion do |exchange| @queue_completion.completion do |queue| @channel_completion.completion do |channel| queue.unbind(exchange) do queue.delete do exchange.delete do channel.close(&blk) end end end end end end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
138 139 140 141 142 143 |
# File 'lib/smith/messaging/receiver.rb', line 138 def on_error(chain=false, &blk) # TODO Check that this chains callbacks @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_requeue(&blk) ⇒ Object
198 199 200 201 202 |
# File 'lib/smith/messaging/receiver.rb', line 198 def on_requeue(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue => blk) end end |
#on_requeue_limit(&blk) ⇒ Object
204 205 206 207 208 |
# File 'lib/smith/messaging/receiver.rb', line 204 def on_requeue_limit(&blk) @requeue_options_completion.completion do || .merge!(:on_requeue_limit => blk) end end |
#pop(&blk) ⇒ Object
pops a message off the queue and passes the headers and payload into the block. pop
will automatically acknowledge the message unless the options sets :ack to false.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/smith/messaging/receiver.rb', line 122 def pop(&blk) opts = @options.pop @queue_completion.completion do |queue| @requeue_options_completion.completion do || queue.pop(opts) do |, payload| if payload (, payload, , &blk) else blk.call(nil,nil) end end end end end |
#queue_name ⇒ Object
164 165 166 |
# File 'lib/smith/messaging/receiver.rb', line 164 def queue_name @queue_def.denormalise end |
#requeue_parameters(opts = {}) ⇒ Object
192 193 194 195 196 |
# File 'lib/smith/messaging/receiver.rb', line 192 def requeue_parameters(opts={}) @requeue_options_completion.completion do || .merge!(opts) end end |
#setup_reply_queue(reply_queue_name, &blk) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/smith/messaging/receiver.rb', line 70 def setup_reply_queue(reply_queue_name, &blk) if @reply_queues[reply_queue_name] blk.call(@reply_queues[reply_queue_name]) else @exchange_completion.completion do |exchange| logger.debug { "Attaching to reply queue: #{reply_queue_name}" } reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false) Smith::Messaging::Sender.new(reply_queue_def) do |sender| @reply_queues[reply_queue_name] = sender blk.call(sender) end end end end |
#status(&blk) ⇒ Object
184 185 186 187 188 189 190 |
# File 'lib/smith/messaging/receiver.rb', line 184 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |
#subscribe(handler = nil, &blk) ⇒ Object
Subscribes to a queue and passes the headers and payload into the block. subscribe
will automatically acknowledge the message unless the options sets :ack to false.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/smith/messaging/receiver.rb', line 90 def subscribe(handler=nil, &blk) blk = handler || blk @queue_completion.completion do |queue| @requeue_options_completion.completion do || if !queue.subscribed? opts = @options.subscribe logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" } queue.subscribe(opts) do |, payload| if payload (, payload, , &blk) else logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" } end end else logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" } end end end end |
#unsubscribe(&blk) ⇒ Object
113 114 115 116 117 |
# File 'lib/smith/messaging/receiver.rb', line 113 def unsubscribe(&blk) @queue_completion.completion do |queue| queue.unsubscribe(&blk) end end |