Class: Smith::Messaging::Foo

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/smith/messaging/receiver.rb

Overview

This class gets passed into the receive block and is a representation of both the message and the message metadata. It also handles requeues and retries. In short it’s very much a convenience class which is why I have no idea what to call it!

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Constructor Details

#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo

Returns a new instance of Foo.



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/smith/messaging/receiver.rb', line 248

def initialize(, data, opts={}, requeue_opts, &blk)
  @opts = opts
  @metadata = 
  @reply_queue = @opts[:reply_queue]
  @requeue_opts = requeue_opts

  @acl_type_cache = AclTypeCache.instance

  @time = Time.now

  # TODO add some better error checking/diagnostics.
  clazz = @acl_type_cache.get_by_hash(.type)

  @message = clazz.new.parse_from_string(data)

  if @opts[:threading]
    EM.defer do
      blk.call(@message, self)
      ack if @opts[:auto_ack]
    end
  else
    blk.call(@message, self)
    ack if @opts[:auto_ack]
  end
end

Instance Attribute Details

#metadataObject

Returns the value of attribute metadata.



246
247
248
# File 'lib/smith/messaging/receiver.rb', line 246

def 
  @metadata
end

Instance Method Details

#ack(multiple = false) ⇒ Object Also known as: call

acknowledge the message.



294
295
296
# File 'lib/smith/messaging/receiver.rb', line 294

def ack(multiple=false)
  @metadata.ack(multiple)
end

#correlation_idObject

the correlation_id



350
351
352
# File 'lib/smith/messaging/receiver.rb', line 350

def correlation_id
  @metadata.correlation_id
end

#fail(acl = nil, opts = {:ack => true}) {|The| ... } ⇒ Object

Publish the ACL to the error queue set up for this queue. This method is only available if the :error_queue option is set to true. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.

Parameters:

  • acl (ACL) (defaults to: nil)

    Optional ACL. With any options this method will fail the entire ACL. This may not be what you want though. So this opton allows you to fail another ACL. WARNING: there is no type checking at the moment. If you publish an ACL that this agent can’t process and republish that ACL at a future date the agent will blow up.

  • opts (Hash) (defaults to: {:ack => true})

    Options hash. This currently only supports on option: :ack. If you publish a different ACL from the one received you will have to ack that message yourself and make sure ‘:ack => nil`

Yield Parameters:

  • The (Fixnum)

    number of ACLs on the error queue.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/smith/messaging/receiver.rb', line 316

def fail(acl=nil, opts={:ack => true}, &blk)
  if @opts[:error_queue]
    message = (acl) ? acl : @message
    Sender.new("#{queue_name}.error") do |queue|
      logger.debug { "Republishing ACL to error queue: \"#{queue.queue_name}\"" }
      queue.publish(message) do
        queue.number_of_messages do |count|
          @metadata.ack if opts[:ack]
          blk && blk && blk.call(count + 1)
        end
      end
    end
  else
    raise ArgumentError, "You cannot fail this queue as you haven't specified the :error_queue option"
  end
end

#queue_nameString

Return the queue_name. Note this is the receive queue name which in most cases is the same at the sender name but if you are using fanout queues it will be different.

Returns:

  • (String)

    the name of the queue



360
361
362
363
364
365
366
367
368
369
# File 'lib/smith/messaging/receiver.rb', line 360

def queue_name
  @a561facf ||= begin
                  queue_name = @metadata.channel.queues.keys.detect { |q| q == @metadata.exchange }
                  if queue_name
                    @a561facf = remove_namespace(queue_name)
                  else
                    raise UnknownQueue, "Queue not found. You are probably you are using fanout queues: #{remove_namespace(@metadata.exchange)}"
                  end
                end
end

#reject(opts = {}) ⇒ Object

reject the message. Optionally requeuing it.



345
346
347
# File 'lib/smith/messaging/receiver.rb', line 345

def reject(opts={})
  @metadata.reject(opts)
end

#remove_namespace(queue_name) ⇒ Object

Remove the Smith namespace prefix (the default is ‘smith.`)

Parameters:

  • queue_name (String)

    The name of the queue

  • The (String)

    name of the queue with the namespace prefix.



377
378
379
# File 'lib/smith/messaging/receiver.rb', line 377

def remove_namespace(queue_name)
  queue_name.gsub(/^#{Smith.config.smith.namespace}\./, '')
end

#reply(acl = nil, &blk) ⇒ Object

Send a message to the reply_to queue as specified in the message header.

Raises:

  • (ArgumentError)


275
276
277
278
279
280
281
282
283
284
# File 'lib/smith/messaging/receiver.rb', line 275

def reply(acl=nil, &blk)
  raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk
  raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil?

  if @metadata.reply_to
    @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.message_id)
  else
    logger.error { "Cannot reply to a message that has no reply_to: #{@metadata.exchange}." }
  end
end

#requeueObject

Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.



289
290
291
# File 'lib/smith/messaging/receiver.rb', line 289

def requeue
  Requeue.new(@message, @metadata, @requeue_opts).requeue
end

#to_procObject

Make #call invoke ack. This makes the following idiom possible:

receiver(‘queue’).subscribe do |payload, receiver|

blah(payload, &receiver)

end

which will ensure that #ack is called properly.



340
341
342
# File 'lib/smith/messaging/receiver.rb', line 340

def to_proc
  proc { |obj| ack(obj) }
end