Class: Smith::Messaging::Foo
- Inherits:
-
Object
- Object
- Smith::Messaging::Foo
- Includes:
- Logger
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Attribute Summary collapse
-
#metadata ⇒ Object
Returns the value of attribute metadata.
Instance Method Summary collapse
-
#ack(multiple = false) ⇒ Object
(also: #call)
acknowledge the message.
-
#correlation_id ⇒ Object
the correlation_id.
-
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
constructor
A new instance of Foo.
-
#reject(opts = {}) ⇒ Object
reject the message.
-
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
-
#requeue ⇒ Object
Requeue the current mesage on the current queue.
-
#to_proc ⇒ Object
Make #call invoke ack.
Methods included from Logger
Constructor Details
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
Returns a new instance of Foo.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/smith/messaging/receiver.rb', line 210 def initialize(, data, opts={}, requeue_opts, &blk) @metadata = @reply_queue = opts[:reply_queue] @requeue_opts = requeue_opts @acl_type_cache = AclTypeCache.instance @time = Time.now # TODO add some better error checking/diagnostics. clazz = @acl_type_cache.get_by_hash(.type) raise ACL::UnknownError, "Unknown ACL: #{.type}" if clazz.nil? @message = clazz.new.parse_from_string(data) if opts[:threading] EM.defer do blk.call(@message, self) ack if opts[:auto_ack] end else blk.call(@message, self) ack if opts[:auto_ack] end end |
Instance Attribute Details
#metadata ⇒ Object
Returns the value of attribute metadata.
208 209 210 |
# File 'lib/smith/messaging/receiver.rb', line 208 def @metadata end |
Instance Method Details
#ack(multiple = false) ⇒ Object Also known as: call
acknowledge the message.
256 257 258 |
# File 'lib/smith/messaging/receiver.rb', line 256 def ack(multiple=false) @metadata.ack(multiple) end |
#correlation_id ⇒ Object
the correlation_id
278 279 280 |
# File 'lib/smith/messaging/receiver.rb', line 278 def correlation_id @metadata.correlation_id end |
#reject(opts = {}) ⇒ Object
reject the message. Optionally requeuing it.
273 274 275 |
# File 'lib/smith/messaging/receiver.rb', line 273 def reject(opts={}) @metadata.reject(opts) end |
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
237 238 239 240 241 242 243 244 245 246 |
# File 'lib/smith/messaging/receiver.rb', line 237 def reply(acl=nil, &blk) raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil? if @metadata.reply_to @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.) else logger.error { "You are replying to a message that has no reply_to: #{@metadata.exchange}." } end end |
#requeue ⇒ Object
Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.
251 252 253 |
# File 'lib/smith/messaging/receiver.rb', line 251 def requeue Requeue.new(@message, @metadata, @requeue_opts).requeue end |
#to_proc ⇒ Object
Make #call invoke ack. This makes the following idiom possible:
receiver(‘queue’).subscribe do |payload, receiver|
blah(payload, &receiver)
end
which will ensure that #ack is called properly.
268 269 270 |
# File 'lib/smith/messaging/receiver.rb', line 268 def to_proc proc { |obj| ack(obj) } end |