Class: Smith::Messaging::Foo
- Inherits:
-
Object
- Object
- Smith::Messaging::Foo
- Includes:
- Logger
- Defined in:
- lib/smith/messaging/receiver.rb
Overview
This class gets passed into the receive block and is a representation of both the message and the message metadata. It also handles requeues and retries. In short it’s very much a convenience class which is why I have no idea what to call it!
Instance Attribute Summary collapse
-
#metadata ⇒ Object
Returns the value of attribute metadata.
Instance Method Summary collapse
-
#ack(multiple = false) ⇒ Object
(also: #call)
acknowledge the message.
-
#correlation_id ⇒ Object
the correlation_id.
-
#fail(acl = nil, opts = {:ack => true}) {|The| ... } ⇒ Object
Publish the ACL to the error queue set up for this queue.
-
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
constructor
A new instance of Foo.
-
#queue_name ⇒ String
Return the queue_name.
-
#reject(opts = {}) ⇒ Object
reject the message.
-
#remove_namespace(queue_name) ⇒ Object
Remove the Smith namespace prefix (the default is ‘smith.`).
-
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
-
#requeue ⇒ Object
Requeue the current mesage on the current queue.
-
#to_proc ⇒ Object
Make #call invoke ack.
Methods included from Logger
Constructor Details
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
Returns a new instance of Foo.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/smith/messaging/receiver.rb', line 248 def initialize(, data, opts={}, requeue_opts, &blk) @opts = opts @metadata = @reply_queue = @opts[:reply_queue] @requeue_opts = requeue_opts @acl_type_cache = AclTypeCache.instance @time = Time.now # TODO add some better error checking/diagnostics. clazz = @acl_type_cache.get_by_hash(.type) @message = clazz.new.parse_from_string(data) if @opts[:threading] EM.defer do blk.call(@message, self) ack if @opts[:auto_ack] end else blk.call(@message, self) ack if @opts[:auto_ack] end end |
Instance Attribute Details
#metadata ⇒ Object
Returns the value of attribute metadata.
246 247 248 |
# File 'lib/smith/messaging/receiver.rb', line 246 def @metadata end |
Instance Method Details
#ack(multiple = false) ⇒ Object Also known as: call
acknowledge the message.
294 295 296 |
# File 'lib/smith/messaging/receiver.rb', line 294 def ack(multiple=false) @metadata.ack(multiple) end |
#correlation_id ⇒ Object
the correlation_id
350 351 352 |
# File 'lib/smith/messaging/receiver.rb', line 350 def correlation_id @metadata.correlation_id end |
#fail(acl = nil, opts = {:ack => true}) {|The| ... } ⇒ Object
Publish the ACL to the error queue set up for this queue. This method is only available if the :error_queue option is set to true. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/smith/messaging/receiver.rb', line 316 def fail(acl=nil, opts={:ack => true}, &blk) if @opts[:error_queue] = (acl) ? acl : @message Sender.new("#{queue_name}.error") do |queue| logger.debug { "Republishing ACL to error queue: \"#{queue.queue_name}\"" } queue.publish() do queue. do |count| @metadata.ack if opts[:ack] blk && blk && blk.call(count + 1) end end end else raise ArgumentError, "You cannot fail this queue as you haven't specified the :error_queue option" end end |
#queue_name ⇒ String
Return the queue_name. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.
360 361 362 363 364 365 366 367 368 369 |
# File 'lib/smith/messaging/receiver.rb', line 360 def queue_name @a561facf ||= begin queue_name = @metadata.channel.queues.keys.detect { |q| q == @metadata.exchange } if queue_name @a561facf = remove_namespace(queue_name) else raise UnknownQueue, "Queue not found. You are probably you are using fanout queues: #{remove_namespace(@metadata.exchange)}" end end end |
#reject(opts = {}) ⇒ Object
reject the message. Optionally requeuing it.
345 346 347 |
# File 'lib/smith/messaging/receiver.rb', line 345 def reject(opts={}) @metadata.reject(opts) end |
#remove_namespace(queue_name) ⇒ Object
Remove the Smith namespace prefix (the default is ‘smith.`)
377 378 379 |
# File 'lib/smith/messaging/receiver.rb', line 377 def remove_namespace(queue_name) queue_name.gsub(/^#{Smith.config.smith.namespace}\./, '') end |
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
275 276 277 278 279 280 281 282 283 284 |
# File 'lib/smith/messaging/receiver.rb', line 275 def reply(acl=nil, &blk) raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil? if @metadata.reply_to @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.) else logger.error { "Cannot reply to a message that has no reply_to: #{@metadata.exchange}." } end end |
#requeue ⇒ Object
Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.
289 290 291 |
# File 'lib/smith/messaging/receiver.rb', line 289 def requeue Requeue.new(@message, @metadata, @requeue_opts).requeue end |
#to_proc ⇒ Object
Make #call invoke ack. This makes the following idiom possible:
receiver(‘queue’).subscribe do |payload, receiver|
blah(payload, &receiver)
end
which will ensure that #ack is called properly.
340 341 342 |
# File 'lib/smith/messaging/receiver.rb', line 340 def to_proc proc { |obj| ack(obj) } end |