Class: RoMQ::Connection
- Inherits:
-
Object
show all
- Includes:
- Helpers
- Defined in:
- lib/romq/connection.rb
Constant Summary
Constants included
from Helpers
Helpers::DEBUG
Instance Method Summary
collapse
Methods included from Helpers
#flush_logs_periodically, #logger, #stop_gracefully
Constructor Details
#initialize(&block) ⇒ Connection
Returns a new instance of Connection.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/romq/connection.rb', line 10
def initialize(&block)
stop_gracefully
EM.run do
flush_logs_periodically.()
connection = Config.rabbitmq_connection
@amqp_connection = AMQP.connect(connection)
@amqp_connection.on_error do |channel, connection_close|
raise channel_close.reply_text
end
@amqp_connection.on_open do
logger.info("[RoMQ] connected to #{connection[:host]}")
@amqp_connection.on_tcp_connection_loss do
reconnect.(block)
end
channel = AMQP::Channel.new(@amqp_connection, auto_recovery: true)
channel.on_error(&handle_channel_error)
channel.prefetch(Config::RABBITMQ_PREFECTH)
yield(channel)
end
end
rescue AMQP::TCPConnectionFailed
reconnect.(block)
rescue AMQP::PossibleAuthenticationFailureError
logger.info("[RoMQ] could not authenticate, check your credentials: #{@amqp_connection.settings}")
stop.()
end
|
Instance Method Details
#handle_channel_error ⇒ Object
65
66
67
68
69
70
71
72
73
|
# File 'lib/romq/connection.rb', line 65
def handle_channel_error
Proc.new do |channel, channel_close|
logger.error("[RoMQ] channel-level exception")
logger.error("[RoMQ] AMQP class id : #{channel_close.class_id}")
logger.error("[RoMQ] AMQP method id: #{channel_close.method_id}")
logger.error("[RoMQ] Status code : #{channel_close.reply_code}")
logger.error("[RoMQ] Error message : #{channel_close.reply_text}")
end
end
|
#stop ⇒ Object
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/romq/connection.rb', line 52
def stop
Proc.new do
if @amqp_connection and @amqp_connection.connected?
@amqp_connection.disconnect do
logger.info("[RoMQ] closed connection to #{@amqp_connection.settings[:host]}")
super
end
else
super
end
end
end
|