Module: MessageQueue::Consumable

Includes:
Logging
Defined in:
lib/message_queue/consumable.rb

Overview

A module to mix in a consumer class, for example:

class Consumer

include MessageQueue::Consumable

queue :name => "print_time_now"
exchange :name => "time", :routing_key => "time.#"

def process(message)
  ...
end

end

The consumer class needs to implement the process method which will be passed a MessageQueue::Message instance when it receives a message.

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Class Method Details

.included(base) ⇒ Object



22
23
24
25
# File 'lib/message_queue/consumable.rb', line 22

def self.included(base)
  base.extend(ClassMethods)
  MessageQueue.register_consumable(base)
end

Instance Method Details

#initializeObject



53
54
55
56
57
# File 'lib/message_queue/consumable.rb', line 53

def initialize
  @consumer = MessageQueue.new_consumer(:queue => self.class.queue_options,
                                        :exchange => self.class.exchange_options,
                                        :subscribe => self.class.subscribe_options)
end

#subscribe(options = {}) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/message_queue/consumable.rb', line 59

def subscribe(options = {})
  @consumer.subscribe(options) do |message|
    begin
      logger.info("Message(#{message.message_id || '-'}): " +
                  "routing key: #{message.routing_key}, " +
                  "type: #{message.type}, " +
                  "timestamp: #{message.timestamp}, " +
                  "consumer: #{@consumer.class}, " +
                  "payload: #{message.payload}")
      process(message)
    rescue StandardError => ex
      handle_error(message, consumer, ex)
    end
  end
end