Class: Evrone::Common::AMQP::Session
- Inherits:
-
Object
- Object
- Evrone::Common::AMQP::Session
show all
- Includes:
- Logger
- Defined in:
- lib/evrone/common/amqp/session.rb
Defined Under Namespace
Classes: ConnectionDoesNotExist
Constant Summary
collapse
- CHANNEL_KEY =
:evrone_amqp_channel
- @@session_lock =
Mutex.new
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
15
16
17
|
# File 'lib/evrone/common/amqp/session.rb', line 15
def conn
@conn
end
|
Class Method Details
.resume ⇒ Object
26
27
28
|
# File 'lib/evrone/common/amqp/session.rb', line 26
def resume
@shutdown = false
end
|
.shutdown ⇒ Object
18
19
20
|
# File 'lib/evrone/common/amqp/session.rb', line 18
def shutdown
@shutdown = true
end
|
.shutdown? ⇒ Boolean
22
23
24
|
# File 'lib/evrone/common/amqp/session.rb', line 22
def shutdown?
@shutdown == true
end
|
Instance Method Details
#channel ⇒ Object
95
96
97
98
99
|
# File 'lib/evrone/common/amqp/session.rb', line 95
def channel
assert_connection_is_open
Thread.current[CHANNEL_KEY] || conn.default_channel
end
|
#close ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/evrone/common/amqp/session.rb', line 31
def close
if open?
@@session_lock.synchronize do
info "closing connection"
begin
conn.close
rescue Bunny::ChannelError => e
warn e
end
info "wait..."
while conn.status != :closed
sleep 0.01
end
@conn = nil
info "connection closed"
end
end
end
|
#config ⇒ Object
128
129
130
|
# File 'lib/evrone/common/amqp/session.rb', line 128
def config
Common::AMQP.config
end
|
#conn_info ⇒ Object
115
116
117
118
119
|
# File 'lib/evrone/common/amqp/session.rb', line 115
def conn_info
if conn
"#{conn.user}:#{conn.host}:#{conn.port}/#{conn.vhost}"
end
end
|
#declare_exchange(name, options = nil) ⇒ Object
76
77
78
79
80
81
82
83
84
|
# File 'lib/evrone/common/amqp/session.rb', line 76
def declare_exchange(name, options = nil)
assert_connection_is_open
options ||= {}
name ||= config.default_exchange_name
ch = options.delete(:channel) || channel
type, opts = get_exchange_type_and_options options
ch.exchange name, opts.merge(type: type)
end
|
#declare_queue(name, options = nil) ⇒ Object
86
87
88
89
90
91
92
93
|
# File 'lib/evrone/common/amqp/session.rb', line 86
def declare_queue(name, options = nil)
assert_connection_is_open
options ||= {}
ch = options.delete(:channel) || channel
name, opts = get_queue_name_and_options(name, options)
ch.queue name, opts
end
|
#open ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/evrone/common/amqp/session.rb', line 50
def open
return self if open?
@@session_lock.synchronize do
self.class.resume
@conn ||= Bunny.new config.url, heartbeat: :server
unless conn.open?
info "connecting to #{conn_info}"
conn.start
info "wait connection to #{conn_info}"
while conn.connecting?
sleep 0.01
end
info "connected successfuly (#{server_name})"
end
end
self
end
|
#open? ⇒ Boolean
72
73
74
|
# File 'lib/evrone/common/amqp/session.rb', line 72
def open?
conn && conn.open? && conn.status == :open
end
|
#server_name ⇒ Object
121
122
123
124
125
126
|
# File 'lib/evrone/common/amqp/session.rb', line 121
def server_name
if conn
p = conn.server_properties || {}
"#{p["product"]}/#{p["version"]}"
end
end
|
#with_channel ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/evrone/common/amqp/session.rb', line 101
def with_channel
assert_connection_is_open
old,new = nil
begin
old,new = Thread.current[CHANNEL_KEY], conn.create_channel
Thread.current[CHANNEL_KEY] = new
yield
ensure
Thread.current[CHANNEL_KEY] = old
new.close if new && new.open?
end
end
|