Class: AMQP::Boilerplate::Consumer
- Inherits:
-
Object
- Object
- AMQP::Boilerplate::Consumer
- Defined in:
- lib/amqp/boilerplate/consumer.rb
Overview
Inherit from this class to turn a class into a potential consumer that can handle messages delivered to them by AMQP broker.
You should call the macro Consumer.amqp_queue method and implement #handle_message.
To specify subscription options you can call the optional macro Consumer.amqp_subscription method.
Class Method Summary collapse
-
.amqp_exchange(name, options = {}) ⇒ void
Macro for selecting exchange to bind to.
-
.amqp_queue(name = AMQ::Protocol::EMPTY_STRING, options = {}) ⇒ void
Macro that sets up the amqp_queue for a class.
-
.amqp_subscription(options = {}) ⇒ Object
Macro that subscribes to asynchronous message delivery.
- .start ⇒ Object
Instance Method Summary collapse
- #handle_channel_error(channel, channel_close) ⇒ Object
- #handle_message(metadata, payload) ⇒ Object
-
#handle_message_wrapper(metadata, payload) ⇒ Object
Wrapper around message handling routine to prevent the consumer from being killed when an exception occurs.
Class Method Details
.amqp_exchange(name, options = {}) ⇒ void
This method returns an undefined value.
Macro for selecting exchange to bind to
35 36 37 38 |
# File 'lib/amqp/boilerplate/consumer.rb', line 35 def amqp_exchange(name, = {}) @exchange_name = name @exchange_options = end |
.amqp_queue(name = AMQ::Protocol::EMPTY_STRING, options = {}) ⇒ void
This method returns an undefined value.
Macro that sets up the amqp_queue for a class.
45 46 47 48 49 |
# File 'lib/amqp/boilerplate/consumer.rb', line 45 def amqp_queue(name=AMQ::Protocol::EMPTY_STRING, ={}) @queue_name = name @queue_options = AMQP::Boilerplate.register_consumer(self) end |
.amqp_subscription(options = {}) ⇒ Object
Macro that subscribes to asynchronous message delivery.
54 55 56 |
# File 'lib/amqp/boilerplate/consumer.rb', line 54 def amqp_subscription(={}) @subscription_options = end |
.start ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/amqp/boilerplate/consumer.rb', line 58 def start consumer = new channel = AMQP.channel channel.on_error(&consumer.method(:handle_channel_error)) channel.prefetch(AMQP::Boilerplate.consumer_prefetch) queue = channel.queue(@queue_name, @queue_options) # Binding a queue to a exchange by passing a string (instead of a AMQP::Exchange instance) queue.bind(@exchange_name, @exchange_options) if @exchange_name queue.subscribe(@subscription_options, &consumer.method(:handle_message_wrapper)) AMQP::Boilerplate.logger.info("[#{self.name}.start] Started consumer '#{self.name}'") end |
Instance Method Details
#handle_channel_error(channel, channel_close) ⇒ Object
74 75 76 |
# File 'lib/amqp/boilerplate/consumer.rb', line 74 def handle_channel_error(channel, channel_close) AMQP::Boilerplate.logger.error("[#{self.class}#handle_channel_error] Code = #{channel_close.reply_code}, message = #{channel_close.reply_text}") end |
#handle_message(metadata, payload) ⇒ Object
78 79 80 |
# File 'lib/amqp/boilerplate/consumer.rb', line 78 def (, payload) raise NotImplementedError, "The time has come to implement your own consumer class. Good luck!" end |
#handle_message_wrapper(metadata, payload) ⇒ Object
Wrapper around message handling routine to prevent the consumer from being killed when an exception occurs
Catches anything that quacks like a StandardError
. SystemExits, SyntaxErrors and the like will still cause the consumer to be aborted. See Ruby’s exception inheritance hierarchy for a complete list of what is and what is not handled by this wrapper.
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/amqp/boilerplate/consumer.rb', line 89 def (, payload) AMQP::Boilerplate.logger.debug("[#{self.class}#handle_message_wrapper] Received message: #{payload}") (, payload) rescue Exception => e if AMQP::Boilerplate.on_unhandled_consumer_exception.is_a?(Proc) AMQP::Boilerplate.on_unhandled_consumer_exception.call(e, self, , payload) else raise e end end |