Class: Ably::Realtime::Connection::ConnectionManager Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ably/realtime/connection/connection_manager.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine

This is a private class and should never be used directly by developers as the API is likely to change in future.

Constant Summary collapse

RESOLVABLE_ERROR_CODES =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Error codes from the server that can potentially be resolved

{
  token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE
}

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ ConnectionManager

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ConnectionManager.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ably/realtime/connection/connection_manager.rb', line 18

def initialize(connection)
  @connection     = connection
  @timers         = Hash.new { |hash, key| hash[key] = [] }

  connection.unsafe_on(:closed) do
    connection.reset_resume_info
  end

  connection.unsafe_once(:connecting) do
    close_connection_when_reactor_is_stopped
  end

  EventMachine.next_tick do
    # Connect once Connection object is initialised
    connection.connect if client.auto_connect && connection.can_transition_to?(:connecting)
  end
end

Instance Method Details

#close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a Close Models::ProtocolMessage to the server and release the transport



169
170
171
172
173
174
175
# File 'lib/ably/realtime/connection/connection_manager.rb', line 169

def close_connection
  connection.send_protocol_message(action: Ably::Models::ProtocolMessage::ACTION.Close)

  create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do
    force_close_connection if connection.closing?
  end
end

#connected(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called whenever a new connection is made



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ably/realtime/connection/connection_manager.rb', line 89

def connected(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  if connection.key
    if protocol_message.connection_id == connection.id
      logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" }
      EventMachine.next_tick { connection.trigger_resumed }
      resend_pending_message_ack_queue
    else
      logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_key}" }
      connection.reset_client_serial
      nack_messages_on_all_channels protocol_message.error
      force_reattach_on_channels protocol_message.error
    end
  else
    logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_key}" }
    connection.reset_client_serial
  end

  reattach_suspended_channels protocol_message.error

  connection.configure_new protocol_message.connection_id, protocol_message.connection_key, protocol_message.connection_serial
end

#connected_update(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/ably/realtime/connection/connection_manager.rb', line 119

def connected_update(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  connection.configure_new protocol_message.connection_id, protocol_message.connection_key, protocol_message.connection_serial

  state_change = Ably::Models::ConnectionStateChange.new(
    current: connection.state,
    previous: connection.state,
    event: Ably::Realtime::Connection::EVENT(:update),
    reason: protocol_message.error,
    protocol_message: protocol_message
  )
  connection.emit :update, state_change
end

#connection_opening_failed(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by the transport when a connection attempt fails



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/ably/realtime/connection/connection_manager.rb', line 71

def connection_opening_failed(error)
  if error.kind_of?(Ably::Exceptions::BaseAblyException)
    # Authentication errors that indicate the authentication failure is terminal should move to the failed state
    if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) ||
       (error.code == Ably::Exceptions::INVALID_CLIENT_ID)
      connection.transition_state_machine :failed, reason: error
      return
    end
  end

  logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.message}" }
  next_state = get_next_retry_state_info
  connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.message}", nil, 80000, error)
end

#destroy_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ensures the underlying transport has been disconnected and all event emitter callbacks removed



141
142
143
144
145
146
147
# File 'lib/ably/realtime/connection/connection_manager.rb', line 141

def destroy_transport
  if transport
    unsubscribe_from_transport_events transport
    transport.close_connection
    connection.release_websocket_transport
  end
end

#detach_active_channelsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



289
290
291
292
293
294
295
# File 'lib/ably/realtime/connection/connection_manager.rb', line 289

def detach_active_channels
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed
  end
end

#error_received_from_server(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

ProtocolMessage Error received from server. Some error states can be resolved by the client library.



245
246
247
248
249
250
251
252
253
254
# File 'lib/ably/realtime/connection/connection_manager.rb', line 245

def error_received_from_server(error)
  case error.code
  when RESOLVABLE_ERROR_CODES.fetch(:token_expired)
    next_state = get_next_retry_state_info(1)
    connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error
  else
    logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.message}', transitioning to failed state" }
    connection.transition_state_machine :failed, reason: error
  end
end

#fail(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Connection has failed



188
189
190
191
192
193
194
195
# File 'lib/ably/realtime/connection/connection_manager.rb', line 188

def fail(error)
  connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" }
  destroy_transport
  channels.each do |channel|
    next if channel.detached? || channel.initialized?
    channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed)
  end
end

#fail_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



298
299
300
301
302
303
304
# File 'lib/ably/realtime/connection/connection_manager.rb', line 298

def fail_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching? || channel.suspended?
  end.each do |channel|
    channel.transition_state_machine! :failed, reason: error
  end
end

#force_close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Close the underlying transport immediately and set the connection state to closed



180
181
182
183
# File 'lib/ably/realtime/connection/connection_manager.rb', line 180

def force_close_connection
  destroy_transport
  connection.transition_state_machine :closed
end

#nack_messages_on_all_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection



308
309
310
311
312
313
# File 'lib/ably/realtime/connection/connection_manager.rb', line 308

def nack_messages_on_all_channels(error)
  channels.each do |channel|
    channel.manager.fail_messages_awaiting_ack error, immediately: true
    channel.manager.fail_queued_messages error
  end
end

#reconnect_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Reconnect the WebsocketTransport if possible, otherwise set up a new transport



158
159
160
161
162
163
164
# File 'lib/ably/realtime/connection/connection_manager.rb', line 158

def reconnect_transport
  if !transport || transport.disconnected?
    setup_transport
  else
    transport.reconnect connection.current_host, connection.port
  end
end

#release_and_establish_new_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



150
151
152
153
# File 'lib/ably/realtime/connection/connection_manager.rb', line 150

def release_and_establish_new_transport
  destroy_transport
  setup_transport
end

#resend_pending_message_ack_queueObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel base on the serial numbers



269
270
271
272
273
274
275
276
277
# File 'lib/ably/realtime/connection/connection_manager.rb', line 269

def resend_pending_message_ack_queue
  connection.__pending_message_ack_queue__.delete_if do |protocol_message|
    if protocol_message.ack_required?
      connection.__outgoing_message_queue__ << protocol_message
      connection.__outgoing_protocol_msgbus__.publish :protocol_message
      true
    end
  end
end

#reset_liveness_timerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically



317
318
319
320
321
322
323
324
325
326
# File 'lib/ably/realtime/connection/connection_manager.rb', line 317

def reset_liveness_timer
  @liveness_timer.cancel if @liveness_timer
  @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do
    if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval)
      msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped";
      error = Ably::Exceptions::ConnectionTimeout.new(msg, 80003, 408)
      connection.transition_state_machine! :disconnected, reason: error
    end
  end
end

#respond_to_transport_disconnected_when_connecting(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/ably/realtime/connection/connection_manager.rb', line 200

def respond_to_transport_disconnected_when_connecting(error)
  return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request
  return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising

  if error.kind_of?(Ably::Models::ErrorInfo)
    if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
      next_state = get_next_retry_state_info(1)
      logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" }
      EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error }
      return
    end
  end

  if connection.state == :suspended
    return if connection_retry_for(:suspended)
  elsif connection.state == :disconnected
    return if connection_retry_for(:disconnected)
  end

  # Fallback if no other criteria met
  connection.transition_state_machine :failed, reason: error
end

#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/ably/realtime/connection/connection_manager.rb', line 226

def respond_to_transport_disconnected_whilst_connected(error)
  unless connection.disconnected? || connection.suspended?
    logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" }
  else
    logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" }
  end

  if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
    logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" }
  end

  destroy_transport
  respond_to_transport_disconnected_when_connecting error
end

#retry_count_for_state(state) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Number of consecutive attempts for provided state

Returns:

  • (Integer)


259
260
261
# File 'lib/ably/realtime/connection/connection_manager.rb', line 259

def retry_count_for_state(state)
  retries_for_state(state, ignore_states: [:connecting]).count
end

#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates and sets up a new WebsocketTransport available on attribute #transport

Yields:



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/ably/realtime/connection/connection_manager.rb', line 40

def setup_transport
  if transport && !transport.ready_for_release?
    raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first'
  end

  unless client.auth.authentication_security_requirements_met?
    connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, 40103)
    return
  end

  logger.debug { 'ConnectionManager: Opening a websocket transport connection' }

  connection.create_websocket_transport.tap do |socket_deferrable|
    socket_deferrable.callback do |websocket_transport|
      subscribe_to_transport_events websocket_transport
      yield websocket_transport if block_given?
    end
    socket_deferrable.errback do |error|
      connection_opening_failed error
    end
  end

  logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" }
  create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do
    connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, 80014)
  end
end

#suspend_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



280
281
282
283
284
285
286
# File 'lib/ably/realtime/connection/connection_manager.rb', line 280

def suspend_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :suspended, reason: error
  end
end