Class: AMQPHelpers::Daemon
- Inherits:
-
Object
- Object
- AMQPHelpers::Daemon
show all
- Defined in:
- lib/amqp_helpers/daemon.rb
Defined Under Namespace
Classes: ChannelError, ConnectionError, Error
Constant Summary
collapse
- DEFAULT_RECONNECT_WAIT_TIME =
10
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(config) ⇒ Daemon
Returns a new instance of Daemon.
18
19
20
21
22
23
24
25
|
# File 'lib/amqp_helpers/daemon.rb', line 18
def initialize(config)
config.each do |key, value|
setter = "#{key}="
if respond_to?(setter)
send(setter, value)
end
end
end
|
Instance Attribute Details
#connection_params ⇒ Object
Returns the value of attribute connection_params.
15
16
17
|
# File 'lib/amqp_helpers/daemon.rb', line 15
def connection_params
@connection_params
end
|
#environment ⇒ Object
50
51
52
|
# File 'lib/amqp_helpers/daemon.rb', line 50
def environment
@environment ||= 'development'
end
|
#exchanges ⇒ Object
Returns the value of attribute exchanges.
15
16
17
|
# File 'lib/amqp_helpers/daemon.rb', line 15
def exchanges
@exchanges
end
|
#logger ⇒ Object
62
63
64
65
66
67
68
|
# File 'lib/amqp_helpers/daemon.rb', line 62
def logger
@logger ||= if environment == 'development'
Logger.new(STDOUT)
else
Syslogger.new(name, Syslog::LOG_PID, Syslog::LOG_LOCAL0)
end
end
|
#name ⇒ Object
Returns the value of attribute name.
15
16
17
|
# File 'lib/amqp_helpers/daemon.rb', line 15
def name
@name
end
|
#queue_name ⇒ Object
58
59
60
|
# File 'lib/amqp_helpers/daemon.rb', line 58
def queue_name
@queue_name ||= "#{Socket.gethostname}.#{name}"
end
|
#queue_params ⇒ Object
54
55
56
|
# File 'lib/amqp_helpers/daemon.rb', line 54
def queue_params
@queue_params ||= {}
end
|
#reconnect_wait_time ⇒ Object
Instance Method Details
#start(&handler) ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/amqp_helpers/daemon.rb', line 27
def start(&handler)
logger.info "Starting #{name} daemon..."
tcp_connection_failure_handler = Proc.new(&method(:handle_tcp_connection_failure))
amqp_params = { on_tcp_connection_failure: tcp_connection_failure_handler}.merge(connection_params)
AMQP.start(amqp_params) do |connection|
connection.on_open(&method(:handle_open))
connection.on_error(&method(:handle_connection_error))
channel = initialize_channel(connection)
connection.on_tcp_connection_loss(&method(:handle_tcp_connection_loss))
connection.on_recovery(&method(:handle_recovery))
queue = initialize_queue(channel)
queue.subscribe(&handler)
show_stopper = Proc.new do |signal|
logger.info "Signal #{signal} received. #{name} is going down... I REPEAT: WE ARE GOING DOWN!"
connection.close { EventMachine.stop }
end
Signal.trap 'INT', show_stopper
Signal.trap 'TERM', show_stopper
end
end
|