Class: Smith::Messaging::Foo
- Inherits:
-
Object
- Object
- Smith::Messaging::Foo
- Includes:
- Logger
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Attribute Summary collapse
-
#metadata ⇒ Object
Returns the value of attribute metadata.
Instance Method Summary collapse
-
#ack(multiple = false) ⇒ Object
(also: #call)
acknowledge the message.
-
#correlation_id ⇒ Object
the correlation_id.
-
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
constructor
A new instance of Foo.
-
#reject(opts = {}) ⇒ Object
reject the message.
-
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
-
#requeue ⇒ Object
Requeue the current mesage on the current queue.
-
#to_proc ⇒ Object
Make #call invoke ack.
Methods included from Logger
Constructor Details
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
Returns a new instance of Foo.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/smith/messaging/receiver.rb', line 242 def initialize(, data, opts={}, requeue_opts, &blk) = @reply_queue = opts[:reply_queue] @requeue_opts = requeue_opts @acl_type_cache = AclTypeCache.instance @time = Time.now # TODO add some better error checking/diagnostics. clazz = @acl_type_cache.get_by_hash(.type) = clazz.new.parse_from_string(data) if opts[:threading] EM.defer do blk.call(, self) ack if opts[:auto_ack] end else blk.call(, self) ack if opts[:auto_ack] end end |
Instance Attribute Details
#metadata ⇒ Object
Returns the value of attribute metadata.
240 241 242 |
# File 'lib/smith/messaging/receiver.rb', line 240 def end |
Instance Method Details
#ack(multiple = false) ⇒ Object Also known as: call
acknowledge the message.
287 288 289 |
# File 'lib/smith/messaging/receiver.rb', line 287 def ack(multiple=false) .ack(multiple) end |
#correlation_id ⇒ Object
the correlation_id
309 310 311 |
# File 'lib/smith/messaging/receiver.rb', line 309 def correlation_id .correlation_id end |
#reject(opts = {}) ⇒ Object
reject the message. Optionally requeuing it.
304 305 306 |
# File 'lib/smith/messaging/receiver.rb', line 304 def reject(opts={}) .reject(opts) end |
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
268 269 270 271 272 273 274 275 276 277 |
# File 'lib/smith/messaging/receiver.rb', line 268 def reply(acl=nil, &blk) raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil? if .reply_to @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => .) else logger.error { "Cannot reply to a message that has no reply_to: #{@metadata.exchange}." } end end |
#requeue ⇒ Object
Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.
282 283 284 |
# File 'lib/smith/messaging/receiver.rb', line 282 def requeue Requeue.new(, , @requeue_opts).requeue end |
#to_proc ⇒ Object
Make #call invoke ack. This makes the following idiom possible:
receiver(‘queue’).subscribe do |payload, receiver|
blah(payload, &receiver)
end
which will ensure that #ack is called properly.
299 300 301 |
# File 'lib/smith/messaging/receiver.rb', line 299 def to_proc proc { |obj| ack(obj) } end |