Class: IProto::EMConnection
Defined Under Namespace
Modules: FixedLengthProtocol
Constant Summary
ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::HEADER_SIZE
Instance Attribute Summary collapse
Instance Method Summary
collapse
#post_init, #receive_data
#next_request_id, #pack_request, #send_request
Constructor Details
#initialize(host, port, reconnect = true) ⇒ EMConnection
Returns a new instance of EMConnection.
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/iproto/em.rb', line 36
def initialize(host, port, reconnect = true)
@host = host
@port = port
@reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
@should_reconnect = !!reconnect
@reconnect_timer = nil
@connected = :init_waiting
@waiting_requests = {}
@waiting_for_connect = []
init_protocol
@shutdown_hook = false
shutdown_hook
end
|
Instance Attribute Details
#_needed_size ⇒ Object
Returns the value of attribute _needed_size.
78
79
80
|
# File 'lib/iproto/em.rb', line 78
def _needed_size
@_needed_size
end
|
#host ⇒ Object
Returns the value of attribute host.
34
35
36
|
# File 'lib/iproto/em.rb', line 34
def host
@host
end
|
#port ⇒ Object
Returns the value of attribute port.
34
35
36
|
# File 'lib/iproto/em.rb', line 34
def port
@port
end
|
Instance Method Details
#_do_send_request(request_type, body, request) ⇒ Object
156
157
158
159
160
|
# File 'lib/iproto/em.rb', line 156
def _do_send_request(request_type, body, request)
while @waiting_requests.include?(request_id = next_request_id); end
send_data pack_request(request_type, request_id, body)
@waiting_requests[request_id] = request
end
|
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# File 'lib/iproto/em.rb', line 139
def _perform_waiting_for_connect(real)
if real
@waiting_for_connect.each do |request_type, body, request|
::EM.next_tick{
_do_send_request(request_type, body, request)
}
end
else
i = -1
@waiting_for_connect.each do |request_type, body, request|
@waiting_requests[i] = request
i -= 1
end
end
@waiting_for_connect.clear
end
|
#_send_request(request_type, body, request) ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/iproto/em.rb', line 122
def _send_request(request_type, body, request)
if @connected == true
_do_send_request(request_type, body, request)
elsif could_be_connected?
@waiting_for_connect << [request_type, body, request]
if @connected == :force
_setup_reconnect_timer(0)
end
elsif ::EM.reactor_running?
EM.next_tick{
do_response(request, IProto::Disconnected.new("connection is closed"))
}
else
do_response(request, IProto::Disconnected.new("connection is closed"))
end
end
|
#_setup_reconnect_timer(timeout) ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/iproto/em.rb', line 107
def _setup_reconnect_timer(timeout)
if @reconnect_timer.nil?
@reconnect_timer = :waiting
shutdown_hook
if timeout == 0
@connected = :waiting
reconnect @host, @port
else
@reconnect_timer = ::EM.add_timer(timeout) do
reconnect @host, @port
end
end
end
end
|
#close ⇒ Object
162
163
164
|
# File 'lib/iproto/em.rb', line 162
def close
close_connection(false)
end
|
#close_connection(*args) ⇒ Object
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/iproto/em.rb', line 166
def close_connection(*args)
@should_reconnect = nil
if Integer === @reconnect_timer
::EM.cancel_timer @reconnect_timer
end
@reconnect_timer = nil
if @connected == true
super(*args)
end
@connected = false
discard_requests
end
|
#connected? ⇒ Boolean
50
51
52
|
# File 'lib/iproto/em.rb', line 50
def connected?
@connected == true
end
|
#connection_completed ⇒ Object
72
73
74
75
76
|
# File 'lib/iproto/em.rb', line 72
def connection_completed
@reconnect_timer = nil
@connected = true
_perform_waiting_for_connect(true)
end
|
#could_be_connected? ⇒ Boolean
54
55
56
|
# File 'lib/iproto/em.rb', line 54
def could_be_connected?
@connected && (@connected != :force || ::EM.reactor_running?)
end
|
#discard_requests ⇒ Object
180
181
182
183
184
185
186
187
|
# File 'lib/iproto/em.rb', line 180
def discard_requests
exc = IProto::Disconnected.new("discarded cause of disconnect")
_perform_waiting_for_connect(false)
@waiting_requests.keys.each do |req|
request = @waiting_requests.delete req
do_response request, exc
end
end
|
#do_response(request, data) ⇒ Object
103
104
105
|
# File 'lib/iproto/em.rb', line 103
def do_response(request, data)
request.call data
end
|
#init_protocol ⇒ Object
79
80
81
82
|
# File 'lib/iproto/em.rb', line 79
def init_protocol
@_needed_size = HEADER_SIZE
@_state = :receive_header
end
|
#receive_chunk(chunk) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/iproto/em.rb', line 84
def receive_chunk(chunk)
if @_state == :receive_header
body_size = ::BinUtils.get_int32_le(chunk, 4)
@request_id = ::BinUtils.get_int32_le(chunk, 8)
if body_size > 0
@_needed_size = body_size
@_state = :receive_body
return
else
chunk = ''
end
end
request = @waiting_requests.delete @request_id
raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request
@_needed_size = HEADER_SIZE
@_state = :receive_header
do_response(request, chunk)
end
|
#shutdown_hook ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/iproto/em.rb', line 58
def shutdown_hook
unless @shutdown_hook
::EM.add_shutdown_hook {
@connected = @should_reconnect ? :force : false
if Integer === @reconnect_timer
::EM.cancel_timer @reconnect_timer
end
@reconnect_timer = nil
@shutdown_hook = false
}
@shutdown_hook = true
end
end
|
#unbind ⇒ Object
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/iproto/em.rb', line 189
def unbind
prev_connected = @connected
@connected = false
discard_requests
@connected = prev_connected
case @should_reconnect
when true
@reconnect_timer = nil
unless @connected == :force
@connected = false
_setup_reconnect_timer(@reconnect_timeout)
end
when false
if @connected == :init_waiting
raise IProto::CouldNotConnect
else
raise IProto::Disconnected
end
when nil
end
end
|