Module: AMQP::Boilerplate

Extended by:
ConsumerPrefetch, ConsumerRegistry, ForceConsumers, Logging
Defined in:
lib/amqp/boilerplate.rb,
lib/amqp/boilerplate/logging.rb,
lib/amqp/boilerplate/version.rb,
lib/amqp/boilerplate/consumer.rb,
lib/amqp/boilerplate/producer.rb,
lib/amqp/boilerplate/force_consumers.rb,
lib/amqp/boilerplate/consumer_prefetch.rb,
lib/amqp/boilerplate/consumer_registry.rb

Defined Under Namespace

Modules: ConsumerPrefetch, ConsumerRegistry, ForceConsumers, Logging, Producer Classes: Consumer

Constant Summary collapse

VERSION =
"1.2.0"

Instance Attribute Summary

Attributes included from ConsumerRegistry

#consumer_paths

Attributes included from Logging

#logger

Attributes included from ForceConsumers

#force_consumers

Attributes included from ConsumerPrefetch

#consumer_prefetch

Class Method Summary collapse

Methods included from ConsumerRegistry

load_consumers, register_consumer, registry, start_consumers

Class Method Details

.bootvoid

This method returns an undefined value.

Opens a channel to AMQP and starts all consumers

NOTE When an unknown server type is encountered the consumers will NOT be started. A channel will be opened for the producers though.

See Also:

  • Utilities::EventLoopHelper


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/amqp/boilerplate.rb', line 27

def self.boot
  if AMQP::Utilities::EventLoopHelper.server_type == :passenger
    ::PhusionPassenger.on_event(:starting_worker_process) do |forked|
      if forked
        Thread.new do
          AMQP::Boilerplate.start
        end
      end
    end
  else
    AMQP::Utilities::EventLoopHelper.run do
      AMQP::Boilerplate.start
    end
  end

  sleep(0.25)

  AMQP::Boilerplate.logger.info("[#{self.name}.boot] Started AMQP (Server Type: #{AMQP::Utilities::EventLoopHelper.server_type || 'unknown'})")

  EventMachine.next_tick do
    AMQP.channel ||= AMQP::Channel.new(AMQP.connection)

    load_consumers

    if AMQP::Utilities::EventLoopHelper.server_type || force_consumers
      AMQP::Boilerplate.logger.debug("[#{self.name}.boot] Starting consumers")
      start_consumers
    else
      AMQP::Boilerplate.logger.debug("[#{self.name}.boot] Unknown server type, not starting consumers")
    end
  end
end

.configure {|_self| ... } ⇒ Object

Configures AMQP::Boilerplate and yields AMQP::Boilerplate object to the block

Examples:

AMQP::Boilerplate.configure do |config|
  config.logger = ::Rails.logger
  config.consumer_paths += %W( #{Rails.root}/app/consumers )
  config.connection_options = { :host => "localhost", :port => 5672, :vhost => Rails.env }
  config.on_unhandled_exception = Proc.new { |exception, consumer, , payload| puts "Do something with exceptions: #{exception}" }
end

Yields:

  • (_self)

Yield Parameters:



87
88
89
# File 'lib/amqp/boilerplate.rb', line 87

def self.configure
  yield self if block_given?
end

.connection_optionsObject



91
92
93
# File 'lib/amqp/boilerplate.rb', line 91

def self.connection_options
  @connection_options
end

.connection_options=(options) ⇒ Object

AMQP connection options (:host, :port, :username, :vhost, :password) that will be passed as connection_options to AMQP#start when starting an EventMachine event loop.



98
99
100
# File 'lib/amqp/boilerplate.rb', line 98

def self.connection_options=(options)
  @connection_options = options
end

.on_unhandled_consumer_exceptionObject



102
103
104
# File 'lib/amqp/boilerplate.rb', line 102

def self.on_unhandled_consumer_exception
  @on_unhandled_consumer_exception
end

.on_unhandled_consumer_exception=(handler) ⇒ Object

Pass a Proc object to this option that will function as a handler for uncaught exceptions in a consumer.



108
109
110
# File 'lib/amqp/boilerplate.rb', line 108

def self.on_unhandled_consumer_exception=(handler)
  @on_unhandled_consumer_exception = handler
end

.shutdownvoid

This method returns an undefined value.

Try to gracefully close channel and stop EventMachine

TODO Close all open channels and not just the default one



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/amqp/boilerplate.rb', line 65

def self.shutdown
  AMQP.stop do
    AMQP::Boilerplate.logger.debug("[#{self.name}.shutdown] Attempting graceful channel and EventMachine shutdown")
    EventMachine.stop
  end

  # 5 seconds is an arbitrary delay
  EventMachine::Timer.new(5) do
    AMQP::Boilerplate.logger.debug("[#{self.name}.shutdown] Graceful shutdown of EM didn't work, forcing shutdown")
    EventMachine.stop
  end
end

.startObject



112
113
114
# File 'lib/amqp/boilerplate.rb', line 112

def self.start
  AMQP.start self.connection_options
end