Class: IProto::EMConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
ConnectionAPI, FixedLengthProtocol
Defined in:
lib/iproto/em.rb

Direct Known Subclasses

EMCallbackConnection, EMFiberedConnection

Defined Under Namespace

Modules: FixedLengthProtocol

Constant Summary

Constants included from ConnectionAPI

ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::EMPTY_STR, ConnectionAPI::HEADER_SIZE, ConnectionAPI::PING, ConnectionAPI::PING_ID

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FixedLengthProtocol

#buffer_reset, #post_init, #receive_data

Methods included from ConnectionAPI

#next_request_id, #pack_request, #send_request

Constructor Details

#initialize(host, port, reconnect = true) ⇒ EMConnection

Returns a new instance of EMConnection.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/iproto/em.rb', line 40

def initialize(host, port, reconnect = true)
  @host = host
  @port = port
  @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
  @should_reconnect = !!reconnect
  @reconnect_timer = nil
  @ping_timer = nil
  @connected = :init_waiting
  @waiting_requests = {}
  @waiting_for_connect = []
  @shutdown_hook = false
  @inactivity_timeout = 0
  init_protocol
  shutdown_hook
end

Instance Attribute Details

#_needed_sizeObject (readonly)

Returns the value of attribute _needed_size.



106
107
108
# File 'lib/iproto/em.rb', line 106

def _needed_size
  @_needed_size
end

#hostObject (readonly)

Returns the value of attribute host.



38
39
40
# File 'lib/iproto/em.rb', line 38

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



38
39
40
# File 'lib/iproto/em.rb', line 38

def port
  @port
end

Instance Method Details

#_do_send_request(request_type, body, request) ⇒ Object



194
195
196
197
198
# File 'lib/iproto/em.rb', line 194

def _do_send_request(request_type, body, request)
  while @waiting_requests.include?(request_id = next_request_id); end
  send_data pack_request(request_type, request_id, body)
  @waiting_requests[request_id] = request
end

#_perform_waiting_for_connect(real) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/iproto/em.rb', line 177

def _perform_waiting_for_connect(real)
  if real
    @waiting_for_connect.each do |request_type, body, request|
      ::EM.next_tick{
      _do_send_request(request_type, body, request)
      }
    end
  else
    i = -1
    @waiting_for_connect.each do |request_type, body, request|
      @waiting_requests[i] = request
      i -= 1
    end
  end
  @waiting_for_connect.clear
end

#_pingObject



113
114
115
# File 'lib/iproto/em.rb', line 113

def _ping
  send_data pack_request(PING, PING_ID, EMPTY_STR)
end

#_send_request(request_type, body, request) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/iproto/em.rb', line 160

def _send_request(request_type, body, request)
  if @connected == true
    _do_send_request(request_type, body, request)
  elsif could_be_connected?
    @waiting_for_connect << [request_type, body, request]
    if @connected == :force
      _setup_reconnect_timer(0)
    end
  elsif ::EM.reactor_running?
    EM.next_tick{
      do_response(request, IProto::Disconnected.new("connection is closed"))
    }
  else
    do_response(request, IProto::Disconnected.new("connection is closed"))
  end
end

#_setup_reconnect_timer(timeout) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/iproto/em.rb', line 145

def _setup_reconnect_timer(timeout)
  if @reconnect_timer.nil?
    @reconnect_timer = :waiting
    shutdown_hook
    if timeout == 0
      @connected = :waiting
      reconnect @host, @port
    else
      @reconnect_timer = ::EM.add_timer(timeout) do
        reconnect @host, @port
      end
    end
  end
end

#_start_pingerObject



56
57
58
59
60
# File 'lib/iproto/em.rb', line 56

def _start_pinger
  if @connected == true && (cit = comm_inactivity_timeout) != 0 && @ping_timer == nil
    @ping_timer = EM.add_periodic_timer([1, cit / 4.0].min, method(:_ping))
  end
end

#_stop_pingerObject



62
63
64
65
66
67
# File 'lib/iproto/em.rb', line 62

def _stop_pinger
  if @ping_timer
    @ping_timer.cancel
    @ping_timer = nil
  end
end

#closeObject



200
201
202
# File 'lib/iproto/em.rb', line 200

def close
  close_connection(false)
end

#close_connection(*args) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/iproto/em.rb', line 204

def close_connection(*args)
  @should_reconnect = nil
  if Integer === @reconnect_timer
    ::EM.cancel_timer @reconnect_timer
  end
  @reconnect_timer = nil

  if @connected == true
    super(*args)
  end
  @connected = false
  discard_requests
end

#comm_inactivity_timeout=(t) ⇒ Object



69
70
71
72
73
74
# File 'lib/iproto/em.rb', line 69

def comm_inactivity_timeout=(t)
  _stop_pinger
  @inactivity_timeout = t
  super
  _start_pinger
end

#connected?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/iproto/em.rb', line 76

def connected?
  @connected == true
end

#connection_completedObject



98
99
100
101
102
103
104
# File 'lib/iproto/em.rb', line 98

def connection_completed
  @reconnect_timer = nil
  @connected = true
  init_protocol
  self.comm_inactivity_timeout= @inactivity_timeout
  _perform_waiting_for_connect(true)
end

#could_be_connected?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/iproto/em.rb', line 80

def could_be_connected?
  @connected && (@connected != :force || ::EM.reactor_running?)
end

#discard_requestsObject



218
219
220
221
222
223
224
225
# File 'lib/iproto/em.rb', line 218

def discard_requests
  exc = IProto::Disconnected.new("discarded cause of disconnect")
  _perform_waiting_for_connect(false)
  @waiting_requests.keys.each do |req|
    request = @waiting_requests.delete req
    do_response request, exc
  end
end

#do_response(request, data) ⇒ Object



141
142
143
# File 'lib/iproto/em.rb', line 141

def do_response(request, data)
  request.call data
end

#init_protocolObject



107
108
109
110
111
# File 'lib/iproto/em.rb', line 107

def init_protocol
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
  buffer_reset
end

#receive_chunk(chunk) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/iproto/em.rb', line 117

def receive_chunk(chunk)
  if @_state == :receive_header
    body_size = ::BinUtils.get_int32_le(chunk, 4)
    @request_id = ::BinUtils.get_int32_le(chunk, 8)
    if body_size > 0
      @_needed_size = body_size
      @_state = :receive_body
      return
    else
      chunk = ''
    end
  end
  if @request_id == PING_ID
    @_needed_size = HEADER_SIZE
    @_state = :receive_header
    return
  end
  request = @waiting_requests.delete @request_id
  raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
  do_response(request, chunk)
end

#shutdown_hookObject



84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/iproto/em.rb', line 84

def shutdown_hook
  unless @shutdown_hook
    ::EM.add_shutdown_hook {
      @connected = @should_reconnect ? :force : false
      if Integer === @reconnect_timer
        ::EM.cancel_timer @reconnect_timer
      end
      @reconnect_timer = nil
      @shutdown_hook = false
    }
    @shutdown_hook = true
  end
end

#unbindObject



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/iproto/em.rb', line 227

def unbind
  _stop_pinger
  prev_connected = @connected
  @connected = false
  discard_requests
  @connected = prev_connected

  case @should_reconnect
  when true
    @reconnect_timer = nil
    unless @connected == :force
      @connected = false
      _setup_reconnect_timer(@reconnect_timeout)
    end
  when false
    if @connected == :init_waiting
      raise IProto::CouldNotConnect
    else
      raise IProto::Disconnected
    end
  when nil
    # do nothing cause we explicitely disconnected
  end
end