Class: IProto::EMConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
ConnectionAPI, FixedLengthProtocol
Defined in:
lib/iproto/em.rb

Direct Known Subclasses

EMCallbackConnection, EMFiberedConnection

Defined Under Namespace

Modules: FixedLengthProtocol

Constant Summary

Constants included from ConnectionAPI

ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::HEADER_SIZE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FixedLengthProtocol

#post_init, #receive_data

Methods included from ConnectionAPI

#next_request_id, #pack_request, #send_request

Constructor Details

#initialize(host, port, reconnect = true) ⇒ EMConnection

Returns a new instance of EMConnection.



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/iproto/em.rb', line 36

def initialize(host, port, reconnect = true)
  @host = host
  @port = port
  @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
  @should_reconnect = !!reconnect
  @reconnect_timer = nil
  @connected = :init_waiting
  @waiting_requests = {}
  @waiting_for_connect = []
  init_protocol
  @shutdown_hook = false
  shutdown_hook
end

Instance Attribute Details

#_needed_sizeObject (readonly)

Returns the value of attribute _needed_size.



78
79
80
# File 'lib/iproto/em.rb', line 78

def _needed_size
  @_needed_size
end

#hostObject (readonly)

Returns the value of attribute host.



34
35
36
# File 'lib/iproto/em.rb', line 34

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



34
35
36
# File 'lib/iproto/em.rb', line 34

def port
  @port
end

Instance Method Details

#_do_send_request(request_type, body, request) ⇒ Object



156
157
158
159
160
# File 'lib/iproto/em.rb', line 156

def _do_send_request(request_type, body, request)
  while @waiting_requests.include?(request_id = next_request_id); end
  send_data pack_request(request_type, request_id, body)
  @waiting_requests[request_id] = request
end

#_perform_waiting_for_connect(real) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/iproto/em.rb', line 139

def _perform_waiting_for_connect(real)
  if real
    @waiting_for_connect.each do |request_type, body, request|
      ::EM.next_tick{
      _do_send_request(request_type, body, request)
      }
    end
  else
    i = -1
    @waiting_for_connect.each do |request_type, body, request|
      @waiting_requests[i] = request
      i -= 1
    end
  end
  @waiting_for_connect.clear
end

#_send_request(request_type, body, request) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/iproto/em.rb', line 122

def _send_request(request_type, body, request)
  if @connected == true
    _do_send_request(request_type, body, request)
  elsif could_be_connected?
    @waiting_for_connect << [request_type, body, request]
    if @connected == :force
      _setup_reconnect_timer(0)
    end
  elsif ::EM.reactor_running?
    EM.next_tick{
      do_response(request, IProto::Disconnected.new("connection is closed"))
    }
  else
    do_response(request, IProto::Disconnected.new("connection is closed"))
  end
end

#_setup_reconnect_timer(timeout) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/iproto/em.rb', line 107

def _setup_reconnect_timer(timeout)
  if @reconnect_timer.nil?
    @reconnect_timer = :waiting
    shutdown_hook
    if timeout == 0
      @connected = :waiting
      reconnect @host, @port
    else
      @reconnect_timer = ::EM.add_timer(timeout) do
        reconnect @host, @port
      end
    end
  end
end

#closeObject



162
163
164
# File 'lib/iproto/em.rb', line 162

def close
  close_connection(false)
end

#close_connection(*args) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/iproto/em.rb', line 166

def close_connection(*args)
  @should_reconnect = nil
  if Integer === @reconnect_timer
    ::EM.cancel_timer @reconnect_timer
  end
  @reconnect_timer = nil

  if @connected == true
    super(*args)
  end
  @connected = false
  discard_requests
end

#connected?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/iproto/em.rb', line 50

def connected?
  @connected == true
end

#connection_completedObject



72
73
74
75
76
# File 'lib/iproto/em.rb', line 72

def connection_completed
  @reconnect_timer = nil
  @connected = true
  _perform_waiting_for_connect(true)
end

#could_be_connected?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/iproto/em.rb', line 54

def could_be_connected?
  @connected && (@connected != :force || ::EM.reactor_running?)
end

#discard_requestsObject



180
181
182
183
184
185
186
187
# File 'lib/iproto/em.rb', line 180

def discard_requests
  exc = IProto::Disconnected.new("discarded cause of disconnect")
  _perform_waiting_for_connect(false)
  @waiting_requests.keys.each do |req|
    request = @waiting_requests.delete req
    do_response request, exc
  end
end

#do_response(request, data) ⇒ Object



103
104
105
# File 'lib/iproto/em.rb', line 103

def do_response(request, data)
  request.call data
end

#init_protocolObject



79
80
81
82
# File 'lib/iproto/em.rb', line 79

def init_protocol
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
end

#receive_chunk(chunk) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/iproto/em.rb', line 84

def receive_chunk(chunk)
  if @_state == :receive_header
    body_size = ::BinUtils.get_int32_le(chunk, 4)
    @request_id = ::BinUtils.get_int32_le(chunk, 8)
    if body_size > 0
      @_needed_size = body_size
      @_state = :receive_body
      return
    else
      chunk = ''
    end
  end
  request = @waiting_requests.delete @request_id
  raise IProto::UnexpectedResponse.new("For request id #{@request_id}") unless request
  @_needed_size = HEADER_SIZE
  @_state = :receive_header
  do_response(request, chunk)
end

#shutdown_hookObject



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/iproto/em.rb', line 58

def shutdown_hook
  unless @shutdown_hook
    ::EM.add_shutdown_hook {
      @connected = @should_reconnect ? :force : false
      if Integer === @reconnect_timer
        ::EM.cancel_timer @reconnect_timer
      end
      @reconnect_timer = nil
      @shutdown_hook = false
    }
    @shutdown_hook = true
  end
end

#unbindObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/iproto/em.rb', line 189

def unbind
  prev_connected = @connected
  @connected = false
  discard_requests
  @connected = prev_connected

  case @should_reconnect
  when true
    @reconnect_timer = nil
    unless @connected == :force
      @connected = false
      _setup_reconnect_timer(@reconnect_timeout)
    end
  when false
    if @connected == :init_waiting
      raise IProto::CouldNotConnect
    else
      raise IProto::Disconnected
    end
  when nil
    # do nothing cause we explicitely disconnected
  end
end