Module: AMQP::Boilerplate
- Extended by:
- ConsumerPrefetch, ConsumerRegistry, ForceConsumers, Logging
- Defined in:
- lib/amqp/boilerplate.rb,
lib/amqp/boilerplate/logging.rb,
lib/amqp/boilerplate/version.rb,
lib/amqp/boilerplate/consumer.rb,
lib/amqp/boilerplate/producer.rb,
lib/amqp/boilerplate/force_consumers.rb,
lib/amqp/boilerplate/consumer_prefetch.rb,
lib/amqp/boilerplate/consumer_registry.rb
Defined Under Namespace
Modules: ConsumerPrefetch, ConsumerRegistry, ForceConsumers, Logging, Producer Classes: Consumer
Constant Summary collapse
- VERSION =
"1.2.0"
Instance Attribute Summary
Attributes included from ConsumerRegistry
Attributes included from Logging
Attributes included from ForceConsumers
Attributes included from ConsumerPrefetch
Class Method Summary collapse
-
.boot ⇒ void
Opens a channel to AMQP and starts all consumers.
-
.configure {|_self| ... } ⇒ Object
Configures AMQP::Boilerplate and yields AMQP::Boilerplate object to the block.
- .connection_options ⇒ Object
-
.connection_options=(options) ⇒ Object
AMQP connection options (:host, :port, :username, :vhost, :password) that will be passed as connection_options to AMQP#start when starting an EventMachine event loop.
- .on_unhandled_consumer_exception ⇒ Object
-
.on_unhandled_consumer_exception=(handler) ⇒ Object
Pass a
Proc
object to this option that will function as a handler for uncaught exceptions in a consumer. -
.shutdown ⇒ void
Try to gracefully close channel and stop EventMachine.
- .start ⇒ Object
Methods included from ConsumerRegistry
load_consumers, register_consumer, registry, start_consumers
Class Method Details
.boot ⇒ void
This method returns an undefined value.
Opens a channel to AMQP and starts all consumers
NOTE When an unknown server type is encountered the consumers will NOT be started. A channel will be opened for the producers though.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/amqp/boilerplate.rb', line 27 def self.boot if AMQP::Utilities::EventLoopHelper.server_type == :passenger ::PhusionPassenger.on_event(:starting_worker_process) do |forked| if forked Thread.new do AMQP::Boilerplate.start end end end else AMQP::Utilities::EventLoopHelper.run do AMQP::Boilerplate.start end end sleep(0.25) AMQP::Boilerplate.logger.info("[#{self.name}.boot] Started AMQP (Server Type: #{AMQP::Utilities::EventLoopHelper.server_type || 'unknown'})") EventMachine.next_tick do AMQP.channel ||= AMQP::Channel.new(AMQP.connection) load_consumers if AMQP::Utilities::EventLoopHelper.server_type || force_consumers AMQP::Boilerplate.logger.debug("[#{self.name}.boot] Starting consumers") start_consumers else AMQP::Boilerplate.logger.debug("[#{self.name}.boot] Unknown server type, not starting consumers") end end end |
.configure {|_self| ... } ⇒ Object
Configures AMQP::Boilerplate and yields AMQP::Boilerplate object to the block
87 88 89 |
# File 'lib/amqp/boilerplate.rb', line 87 def self.configure yield self if block_given? end |
.connection_options ⇒ Object
91 92 93 |
# File 'lib/amqp/boilerplate.rb', line 91 def self. @connection_options end |
.connection_options=(options) ⇒ Object
AMQP connection options (:host, :port, :username, :vhost, :password) that will be passed as connection_options to AMQP#start when starting an EventMachine event loop.
98 99 100 |
# File 'lib/amqp/boilerplate.rb', line 98 def self.() @connection_options = end |
.on_unhandled_consumer_exception ⇒ Object
102 103 104 |
# File 'lib/amqp/boilerplate.rb', line 102 def self.on_unhandled_consumer_exception @on_unhandled_consumer_exception end |
.on_unhandled_consumer_exception=(handler) ⇒ Object
Pass a Proc
object to this option that will function as a handler for uncaught exceptions in a consumer.
108 109 110 |
# File 'lib/amqp/boilerplate.rb', line 108 def self.on_unhandled_consumer_exception=(handler) @on_unhandled_consumer_exception = handler end |
.shutdown ⇒ void
This method returns an undefined value.
Try to gracefully close channel and stop EventMachine
TODO Close all open channels and not just the default one
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/amqp/boilerplate.rb', line 65 def self.shutdown AMQP.stop do AMQP::Boilerplate.logger.debug("[#{self.name}.shutdown] Attempting graceful channel and EventMachine shutdown") EventMachine.stop end # 5 seconds is an arbitrary delay EventMachine::Timer.new(5) do AMQP::Boilerplate.logger.debug("[#{self.name}.shutdown] Graceful shutdown of EM didn't work, forcing shutdown") EventMachine.stop end end |
.start ⇒ Object
112 113 114 |
# File 'lib/amqp/boilerplate.rb', line 112 def self.start AMQP.start self. end |