Class: Skein::Connected
- Inherits:
-
Object
show all
- Defined in:
- lib/skein/connected.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(config: nil, connection: nil, context: nil, ident: nil) ⇒ Connected
Instance Methods =====================================================
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/skein/connected.rb', line 10
def initialize(config: nil, connection: nil, context: nil, ident: nil)
@mutex = Mutex.new
@config = config
@connection_shared = !connection
@connection = connection
self.connect
@channels = [ ]
@context = context || Skein::Context.new
@ident = ident || @context.ident(self)
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
6
7
8
|
# File 'lib/skein/connected.rb', line 6
def connection
@connection
end
|
#context ⇒ Object
Properties ===========================================================
4
5
6
|
# File 'lib/skein/connected.rb', line 4
def context
@context
end
|
#ident ⇒ Object
Returns the value of attribute ident.
5
6
7
|
# File 'lib/skein/connected.rb', line 5
def ident
@ident
end
|
Instance Method Details
#channel ⇒ Object
84
85
86
|
# File 'lib/skein/connected.rb', line 84
def channel
@channel ||= self.create_channel
end
|
#close ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/skein/connected.rb', line 88
def close
lock do
@channels.each do |channel|
begin
channel.close
rescue => e
if (defined?(MarchHare))
case (e)
when MarchHare::ChannelLevelException, MarchHare::ChannelAlreadyClosed
else
raise e
end
elsif (defined?(Bunny))
case (e)
when Bunny::ChannelAlreadyClosed
else
raise e
end
else
raise e
end
end
end
@channels = [ ]
unless (@connection_shared)
@connection&.close
@connection = nil
end
end
end
|
#connect ⇒ Object
48
49
50
51
52
53
|
# File 'lib/skein/connected.rb', line 48
def connect
@connection ||= repeat_until_not_nil do
@connection_shared = false
Skein::RabbitMQ.connect(@config)
end
end
|
#connection_shared? ⇒ Boolean
24
25
26
|
# File 'lib/skein/connected.rb', line 24
def connection_shared?
@connection_shared
end
|
#create_channel(auto_retry: false) ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/skein/connected.rb', line 61
def create_channel(auto_retry: false)
channel = begin
@connection.create_channel
rescue RuntimeError
sleep(1)
self.reconnect
retry
end
if (channel.respond_to?(:prefetch=))
channel.prefetch = 1
else
channel.prefetch(1)
end
@channels << channel
channel
end
|
#lock ⇒ Object
28
29
30
31
32
|
# File 'lib/skein/connected.rb', line 28
def lock
@mutex.synchronize do
yield
end
end
|
#reconnect ⇒ Object
55
56
57
58
59
|
# File 'lib/skein/connected.rb', line 55
def reconnect
@connection = nil
self.connect
end
|
#repeat_until_not_nil(delay: 1.0) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/skein/connected.rb', line 34
def repeat_until_not_nil(delay: 1.0)
r = nil
loop do
r = yield
break if r
sleep(delay)
end
r
end
|