Class: AMQP::Boilerplate::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/boilerplate/consumer.rb

Overview

Inherit from this class to turn a class into a potential consumer that can handle messages delivered to them by AMQP broker.

You should call the macro Consumer.amqp_queue method and implement #handle_message.

To specify subscription options you can call the optional macro Consumer.amqp_subscription method.

Examples:

Basic consumer

class MyConsumer < AMQP::Boilerplate::Consumer
  amqp_queue "hello.world"

  def handle_message(payload, )
    puts "Received message: #{payload}"
  end
end

Configuring subscription

class MyConsumer < AMQP::Boilerplate::Consumer
  amqp_queue "queue.name.here", :durable => true
  amqp_subscription :ack => true

  def handle_message(payload, )
    puts "Received message: #{payload}"
  end
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.amqp_exchange(name, options = {}) ⇒ void

This method returns an undefined value.

Macro for selecting exchange to bind to

Parameters:

  • name (String)

    Exchange name

  • options (Hash) (defaults to: {})

    Options that will be passed as options to AMQP::Queue#bind



35
36
37
38
# File 'lib/amqp/boilerplate/consumer.rb', line 35

def amqp_exchange(name, options = {})
  @exchange_name = name
  @exchange_options = options
end

.amqp_queue(name = AMQ::Protocol::EMPTY_STRING, options = {}) ⇒ void

This method returns an undefined value.

Macro that sets up the amqp_queue for a class.

Parameters:

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. If you want a server-named queue, you can omit the name.

  • options (Hash) (defaults to: {})

    Options that will be passed as options to AMQP::Channel#queue



45
46
47
48
49
# File 'lib/amqp/boilerplate/consumer.rb', line 45

def amqp_queue(name=AMQ::Protocol::EMPTY_STRING, options={})
  @queue_name = name
  @queue_options = options
  AMQP::Boilerplate.register_consumer(self)
end

.amqp_subscription(options = {}) ⇒ Object

Macro that subscribes to asynchronous message delivery.

Parameters:



54
55
56
# File 'lib/amqp/boilerplate/consumer.rb', line 54

def amqp_subscription(options={})
  @subscription_options = options
end

.startObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/amqp/boilerplate/consumer.rb', line 58

def start
  consumer = new

  channel = AMQP.channel
  channel.on_error(&consumer.method(:handle_channel_error))
  channel.prefetch(AMQP::Boilerplate.consumer_prefetch)

  queue = channel.queue(@queue_name, @queue_options)
  # Binding a queue to a exchange by passing a string (instead of a AMQP::Exchange instance)
  queue.bind(@exchange_name, @exchange_options) if @exchange_name
  queue.subscribe(@subscription_options, &consumer.method(:handle_message_wrapper))

  AMQP::Boilerplate.logger.info("[#{self.name}.start] Started consumer '#{self.name}'")
end

Instance Method Details

#handle_channel_error(channel, channel_close) ⇒ Object



74
75
76
# File 'lib/amqp/boilerplate/consumer.rb', line 74

def handle_channel_error(channel, channel_close)
  AMQP::Boilerplate.logger.error("[#{self.class}#handle_channel_error] Code = #{channel_close.reply_code}, message = #{channel_close.reply_text}")
end

#handle_message(metadata, payload) ⇒ Object

Raises:

  • (NotImplementedError)


78
79
80
# File 'lib/amqp/boilerplate/consumer.rb', line 78

def handle_message(, payload)
  raise NotImplementedError, "The time has come to implement your own consumer class. Good luck!"
end

#handle_message_wrapper(metadata, payload) ⇒ Object

Wrapper around message handling routine to prevent the consumer from being killed when an exception occurs

Catches anything that quacks like a StandardError. SystemExits, SyntaxErrors and the like will still cause the consumer to be aborted. See Ruby’s exception inheritance hierarchy for a complete list of what is and what is not handled by this wrapper.



89
90
91
92
93
94
95
96
97
98
# File 'lib/amqp/boilerplate/consumer.rb', line 89

def handle_message_wrapper(, payload)
  AMQP::Boilerplate.logger.debug("[#{self.class}#handle_message_wrapper] Received message: #{payload}")
  handle_message(, payload)
rescue Exception => e
  if AMQP::Boilerplate.on_unhandled_consumer_exception.is_a?(Proc)
    AMQP::Boilerplate.on_unhandled_consumer_exception.call(e, self, , payload)
  else
    raise e
  end
end