Class: RuoteAMQP::Listener
- Inherits:
-
OpenWFE::Service
- Object
- OpenWFE::Service
- RuoteAMQP::Listener
- Includes:
- OpenWFE::WorkItemListener
- Defined in:
- lib/ruote-amqp/listener.rb
Overview
AMQP Listeners
Used in conjunction with the RuoteAMQP::Participant, the Listener subscribes to a specific direct exchange and monitors for incoming workitems. It expects workitems to arrive serialized as JSON.
Configuration
AMQP configuration is handled by directly manipulating the values of the AMQP.settings
hash, as provided by the AMQP gem. No defaults are set by the participant. The only option
parsed by the initializer of the listener is the queue
key (Hash expected). If no queue
key is set, the listener will subscribe to the ruote
direct exchange for workitems, otherwise it will subscribe to the direct exchange provided.
The participant requires version 0.6.1 or later of the amqp gem.
Usage
Register the listener with the engine:
engine.register_listener( RuoteAMQP::Listener )
The listener leverages the asynchronous nature of the amqp gem, so no timers are setup when initialized.
See the RuoteAMQP::Participant docs for information on sending workitems out to remote participants, and have them send replies to the correct direct exchange specified in the workitem attributes.
Class Attribute Summary collapse
Instance Method Summary collapse
-
#initialize(service_name, options) ⇒ Listener
constructor
Only one option is used (:queue) to determine where to listen for work.
- #stop ⇒ Object
Constructor Details
#initialize(service_name, options) ⇒ Listener
Only one option is used (:queue) to determine where to listen for work
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/ruote-amqp/listener.rb', line 56 def initialize( service_name, ) if q = .delete(:queue) self.class.queue = q end super( service_name, ) #if AMQP.connection.nil? @em_thread = Thread.new { EM.run } unless EM.reactor_running? #end MQ.queue( self.class.queue, :durable => true ).subscribe do || workitem = decode_workitem( ) ldebug { "workitem from '#{self.class.queue}': #{workitem.inspect}" } handle_item( workitem ) end end |
Class Attribute Details
.queue ⇒ Object
48 49 50 |
# File 'lib/ruote-amqp/listener.rb', line 48 def queue @queue ||= 'ruote' end |
Instance Method Details
#stop ⇒ Object
75 76 77 78 79 80 |
# File 'lib/ruote-amqp/listener.rb', line 75 def stop linfo { "Stopping..." } AMQP.stop { EM.stop } #if EM.reactor_running? } @em_thread.join if @em_thread end |