Class: Ably::Realtime::Connection::ConnectionManager Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ably/realtime/connection/connection_manager.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine

This is a private class and should never be used directly by developers as the API is likely to change in future.

Constant Summary collapse

RESOLVABLE_ERROR_CODES =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Error codes from the server that can potentially be resolved

{
  token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE
}

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ ConnectionManager

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ConnectionManager.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ably/realtime/connection/connection_manager.rb', line 18

def initialize(connection)
  @connection     = connection
  @timers         = Hash.new { |hash, key| hash[key] = [] }

  connection.unsafe_on(:closed) do
    connection.reset_resume_info
  end

  connection.unsafe_once(:connecting) do
    close_connection_when_reactor_is_stopped
  end

  EventMachine.next_tick do
    # Connect once Connection object is initialised
    connection.connect if client.auto_connect && connection.can_transition_to?(:connecting)
  end
end

Instance Method Details

#close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a Close Models::ProtocolMessage to the server and release the transport



185
186
187
188
189
190
191
# File 'lib/ably/realtime/connection/connection_manager.rb', line 185

def close_connection
  connection.send_protocol_message(action: Ably::Models::ProtocolMessage::ACTION.Close)

  create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do
    force_close_connection if connection.closing?
  end
end

#connected(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called whenever a new connection is made



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ably/realtime/connection/connection_manager.rb', line 107

def connected(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  if connection.key
    if protocol_message.connection_id == connection.id
      logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" }
      EventMachine.next_tick { connection.trigger_resumed }
      resend_pending_message_ack_queue
    else
      logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" }
      nack_messages_on_all_channels protocol_message.error
      force_reattach_on_channels protocol_message.error
    end
  else
    logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" }
  end

  reattach_suspended_channels protocol_message.error

  connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial
end

#connected_update(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/ably/realtime/connection/connection_manager.rb', line 135

def connected_update(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial

  state_change = Ably::Models::ConnectionStateChange.new(
    current: connection.state,
    previous: connection.state,
    event: Ably::Realtime::Connection::EVENT(:update),
    reason: protocol_message.error,
    protocol_message: protocol_message
  )
  connection.emit :update, state_change
end

#connection_opening_failed(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by the transport when a connection attempt fails



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ably/realtime/connection/connection_manager.rb', line 84

def connection_opening_failed(error)
  if error.kind_of?(Ably::Exceptions::BaseAblyException)
    # Authentication errors that indicate the authentication failure is terminal should move to the failed state
    if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) ||
       (error.code == Ably::Exceptions::Codes::INVALID_CLIENT_ID)
      connection.transition_state_machine :failed, reason: error
      return
    end
  end

  logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.message}" }
  next_state = get_next_retry_state_info

  if connection.state == next_state.fetch(:state)
    logger.error { "ConnectionManager: Skipping next retry state after connection opening failed as already in state #{next_state}\n#{caller[0..20].join("\n")}" }
  else
    connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.message}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED, error)
  end
end

#destroy_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ensures the underlying transport has been disconnected and all event emitter callbacks removed



157
158
159
160
161
162
163
# File 'lib/ably/realtime/connection/connection_manager.rb', line 157

def destroy_transport
  if transport
    unsubscribe_from_transport_events transport
    transport.close_connection
    connection.release_websocket_transport
  end
end

#detach_active_channelsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



307
308
309
310
311
312
313
# File 'lib/ably/realtime/connection/connection_manager.rb', line 307

def detach_active_channels
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed
  end
end

#error_received_from_server(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

ProtocolMessage Error received from server. Some error states can be resolved by the client library.



261
262
263
264
265
266
267
268
269
270
# File 'lib/ably/realtime/connection/connection_manager.rb', line 261

def error_received_from_server(error)
  case error.code
  when RESOLVABLE_ERROR_CODES.fetch(:token_expired)
    next_state = get_next_retry_state_info(1)
    connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error
  else
    logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.message}', transitioning to failed state" }
    connection.transition_state_machine :failed, reason: error
  end
end

#fail(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Connection has failed



204
205
206
207
208
209
210
211
# File 'lib/ably/realtime/connection/connection_manager.rb', line 204

def fail(error)
  connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" }
  destroy_transport
  channels.each do |channel|
    next if channel.detached? || channel.initialized?
    channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed)
  end
end

#fail_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



316
317
318
319
320
321
322
# File 'lib/ably/realtime/connection/connection_manager.rb', line 316

def fail_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching? || channel.suspended?
  end.each do |channel|
    channel.transition_state_machine! :failed, reason: error
  end
end

#force_close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Close the underlying transport immediately and set the connection state to closed



196
197
198
199
# File 'lib/ably/realtime/connection/connection_manager.rb', line 196

def force_close_connection
  destroy_transport
  connection.transition_state_machine :closed
end

#nack_messages_on_all_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection



335
336
337
338
339
340
# File 'lib/ably/realtime/connection/connection_manager.rb', line 335

def nack_messages_on_all_channels(error)
  channels.each do |channel|
    channel.manager.fail_messages_awaiting_ack error, immediately: true
    channel.manager.fail_queued_messages error
  end
end

#reconnect_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Reconnect the WebsocketTransport if possible, otherwise set up a new transport



174
175
176
177
178
179
180
# File 'lib/ably/realtime/connection/connection_manager.rb', line 174

def reconnect_transport
  if !transport || transport.disconnected?
    setup_transport
  else
    transport.reconnect connection.current_host, connection.port
  end
end

#reintialize_failed_chanelsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



325
326
327
328
329
330
331
# File 'lib/ably/realtime/connection/connection_manager.rb', line 325

def reintialize_failed_chanels
  channels.select do |channel|
    channel.failed?
  end.each do |channel|
    channel.transition_state_machine :initialized
  end
end

#release_and_establish_new_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



166
167
168
169
# File 'lib/ably/realtime/connection/connection_manager.rb', line 166

def release_and_establish_new_transport
  destroy_transport
  setup_transport
end

#resend_pending_message_ack_queueObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel base on the serial numbers



287
288
289
290
291
292
293
294
295
# File 'lib/ably/realtime/connection/connection_manager.rb', line 287

def resend_pending_message_ack_queue
  connection.__pending_message_ack_queue__.delete_if do |protocol_message|
    if protocol_message.ack_required?
      connection.__outgoing_message_queue__ << protocol_message
      connection.__outgoing_protocol_msgbus__.publish :protocol_message
      true
    end
  end
end

#reset_liveness_timerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically



344
345
346
347
348
349
350
351
352
353
# File 'lib/ably/realtime/connection/connection_manager.rb', line 344

def reset_liveness_timer
  @liveness_timer.cancel if @liveness_timer
  @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do
    if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval)
      msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped";
      error = Ably::Exceptions::ConnectionTimeout.new(msg, Ably::Exceptions::Codes::DISCONNECTED, 408)
      connection.transition_state_machine! :disconnected, reason: error
    end
  end
end

#respond_to_transport_disconnected_when_connecting(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/ably/realtime/connection/connection_manager.rb', line 216

def respond_to_transport_disconnected_when_connecting(error)
  return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request
  return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising

  if error.kind_of?(Ably::Models::ErrorInfo)
    if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
      next_state = get_next_retry_state_info(1)
      logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" }
      EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error }
      return
    end
  end

  if connection.state == :suspended
    return if connection_retry_for(:suspended)
  elsif connection.state == :disconnected
    return if connection_retry_for(:disconnected)
  end

  # Fallback if no other criteria met
  connection.transition_state_machine :failed, reason: error
end

#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed



242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/ably/realtime/connection/connection_manager.rb', line 242

def respond_to_transport_disconnected_whilst_connected(error)
  unless connection.disconnected? || connection.suspended?
    logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" }
  else
    logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" }
  end

  if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
    logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" }
  end

  destroy_transport
  respond_to_transport_disconnected_when_connecting error
end

#retry_count_for_state(state) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Number of consecutive attempts for provided state

Returns:

  • (Integer)


277
278
279
# File 'lib/ably/realtime/connection/connection_manager.rb', line 277

def retry_count_for_state(state)
  retries_for_state(state, ignore_states: [:connecting]).count
end

#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates and sets up a new WebsocketTransport available on attribute #transport

Yields:



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ably/realtime/connection/connection_manager.rb', line 40

def setup_transport
  if transport && !transport.ready_for_release?
    raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first'
  end

  unless client.auth.authentication_security_requirements_met?
    connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, Ably::Exceptions::Codes::INVALID_USE_OF_BASIC_AUTH_OVER_NONTLS_TRANSPORT)
    return
  end

  logger.debug { 'ConnectionManager: Opening a websocket transport connection' }

  # The socket attempt can fail at the same time as a timer firing so ensure
  #   only one outcome is processed from this setup attempt
  setup_attempt_status = {}
  setup_failed = lambda do
    return true if setup_attempt_status[:failed]
    setup_attempt_status[:failed] = true
    false
  end

  connection.create_websocket_transport.tap do |socket_deferrable|
    socket_deferrable.callback do |websocket_transport|
      subscribe_to_transport_events websocket_transport
      yield websocket_transport if block_given?
    end
    socket_deferrable.errback do |error|
      next if setup_failed.call
      connection_opening_failed error
    end
  end

  # The connection request timeout must be marginally higher than the REST request timeout to ensure
  #   any HTTP auth request failure due to timeout triggers before the connection timer kicks in
  logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" }
  create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do
    next if setup_failed.call
    connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, Ably::Exceptions::Codes::CONNECTION_TIMED_OUT)
  end
end

#suspend_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



298
299
300
301
302
303
304
# File 'lib/ably/realtime/connection/connection_manager.rb', line 298

def suspend_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :suspended, reason: error
  end
end