Class: Ably::Realtime::Connection::ConnectionManager Private
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection::ConnectionManager
- Defined in:
- lib/ably/realtime/connection/connection_manager.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine
This is a private class and should never be used directly by developers as the API is likely to change in future.
Constant Summary collapse
- RESOLVABLE_ERROR_CODES =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Error codes from the server that can potentially be resolved
{ token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE }
Instance Method Summary collapse
-
#close_connection ⇒ Object
private
Send a Close Models::ProtocolMessage to the server and release the transport.
-
#connected(protocol_message) ⇒ Object
private
Called whenever a new connection is made.
-
#connected_update(protocol_message) ⇒ Object
private
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h.
-
#connection_opening_failed(error) ⇒ Object
private
Called by the transport when a connection attempt fails.
-
#destroy_transport ⇒ Object
private
Ensures the underlying transport has been disconnected and all event emitter callbacks removed.
- #detach_active_channels ⇒ Object private
-
#error_received_from_server(error) ⇒ Object
private
ProtocolMessage Error received from server.
-
#fail(error) ⇒ Object
private
Connection has failed.
- #fail_active_channels(error) ⇒ Object private
-
#force_close_connection ⇒ Object
private
Close the underlying transport immediately and set the connection state to closed.
-
#initialize(connection) ⇒ ConnectionManager
constructor
private
A new instance of ConnectionManager.
-
#nack_messages_on_all_channels(error) ⇒ Object
private
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection.
-
#reconnect_transport ⇒ Object
private
Reconnect the WebsocketTransport if possible, otherwise set up a new transport.
- #release_and_establish_new_transport ⇒ Object private
-
#resend_pending_message_ack_queue ⇒ Object
private
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received.
-
#reset_liveness_timer ⇒ Object
private
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically.
-
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
private
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
private
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#retry_count_for_state(state) ⇒ Integer
private
Number of consecutive attempts for provided state.
-
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
private
Creates and sets up a new WebsocketTransport available on attribute #transport.
- #suspend_active_channels(error) ⇒ Object private
Constructor Details
#initialize(connection) ⇒ ConnectionManager
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ConnectionManager.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 18 def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } connection.unsafe_on(:closed) do connection.reset_resume_info end connection.unsafe_once(:connecting) do close_connection_when_reactor_is_stopped end EventMachine.next_tick do # Connect once Connection object is initialised connection.connect if client.auto_connect && connection.can_transition_to?(:connecting) end end |
Instance Method Details
#close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a Close Models::ProtocolMessage to the server and release the transport
167 168 169 170 171 172 173 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 167 def close_connection connection.(action: Ably::Models::ProtocolMessage::ACTION.Close) create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do force_close_connection if connection.closing? end end |
#connected(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called whenever a new connection is made
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 89 def connected() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details if connection.key if .connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } EventMachine.next_tick { connection.trigger_resumed } else logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_key}" } .error force_reattach_on_channels .error end else logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_key}" } end reattach_suspended_channels .error connection.configure_new .connection_id, .connection_key, .connection_serial end |
#connected_update(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 117 def connected_update() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details connection.configure_new .connection_id, .connection_key, .connection_serial state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, previous: connection.state, event: Ably::Realtime::Connection::EVENT(:update), reason: .error, protocol_message: ) connection.emit :update, state_change end |
#connection_opening_failed(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by the transport when a connection attempt fails
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 71 def connection_opening_failed(error) if error.kind_of?(Ably::Exceptions::BaseAblyException) # Authentication errors that indicate the authentication failure is terminal should move to the failed state if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) || (error.code == Ably::Exceptions::Codes::INVALID_CLIENT_ID) connection.transition_state_machine :failed, reason: error return end end logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.message}" } next_state = get_next_retry_state_info connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.message}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED, error) end |
#destroy_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Ensures the underlying transport has been disconnected and all event emitter callbacks removed
139 140 141 142 143 144 145 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 139 def destroy_transport if transport unsubscribe_from_transport_events transport transport.close_connection connection.release_websocket_transport end end |
#detach_active_channels ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
287 288 289 290 291 292 293 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 287 def detach_active_channels channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed end end |
#error_received_from_server(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
ProtocolMessage Error received from server. Some error states can be resolved by the client library.
243 244 245 246 247 248 249 250 251 252 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 243 def error_received_from_server(error) case error.code when RESOLVABLE_ERROR_CODES.fetch(:token_expired) next_state = get_next_retry_state_info(1) connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error else logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.message}', transitioning to failed state" } connection.transition_state_machine :failed, reason: error end end |
#fail(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Connection has failed
186 187 188 189 190 191 192 193 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 186 def fail(error) connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" } destroy_transport channels.each do |channel| next if channel.detached? || channel.initialized? channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed) end end |
#fail_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
296 297 298 299 300 301 302 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 296 def fail_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? || channel.suspended? end.each do |channel| channel.transition_state_machine! :failed, reason: error end end |
#force_close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Close the underlying transport immediately and set the connection state to closed
178 179 180 181 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 178 def force_close_connection destroy_transport connection.transition_state_machine :closed end |
#nack_messages_on_all_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection
306 307 308 309 310 311 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 306 def (error) channels.each do |channel| channel.manager. error, immediately: true channel.manager. error end end |
#reconnect_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reconnect the WebsocketTransport if possible, otherwise set up a new transport
156 157 158 159 160 161 162 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 156 def reconnect_transport if !transport || transport.disconnected? setup_transport else transport.reconnect connection.current_host, connection.port end end |
#release_and_establish_new_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
148 149 150 151 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 148 def release_and_establish_new_transport destroy_transport setup_transport end |
#resend_pending_message_ack_queue ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel base on the serial numbers
267 268 269 270 271 272 273 274 275 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 267 def connection..delete_if do || if .ack_required? connection. << connection.__outgoing_protocol_msgbus__.publish :protocol_message true end end end |
#reset_liveness_timer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically
315 316 317 318 319 320 321 322 323 324 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 315 def reset_liveness_timer @liveness_timer.cancel if @liveness_timer @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval) msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped"; error = Ably::Exceptions::ConnectionTimeout.new(msg, Ably::Exceptions::Codes::DISCONNECTED, 408) connection.transition_state_machine! :disconnected, reason: error end end end |
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 198 def respond_to_transport_disconnected_when_connecting(error) return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising if error.kind_of?(Ably::Models::ErrorInfo) if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) next_state = get_next_retry_state_info(1) logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" } EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error } return end end if connection.state == :suspended return if connection_retry_for(:suspended) elsif connection.state == :disconnected return if connection_retry_for(:disconnected) end # Fallback if no other criteria met connection.transition_state_machine :failed, reason: error end |
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed
224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 224 def respond_to_transport_disconnected_whilst_connected(error) unless connection.disconnected? || connection.suspended? logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" } else logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" } end if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" } end destroy_transport respond_to_transport_disconnected_when_connecting error end |
#retry_count_for_state(state) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of consecutive attempts for provided state
257 258 259 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 257 def retry_count_for_state(state) retries_for_state(state, ignore_states: [:connecting]).count end |
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates and sets up a new WebsocketTransport available on attribute #transport
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 40 def setup_transport if transport && !transport.ready_for_release? raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first' end unless client.auth.authentication_security_requirements_met? connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, Ably::Exceptions::Codes::INVALID_USE_OF_BASIC_AUTH_OVER_NONTLS_TRANSPORT) return end logger.debug { 'ConnectionManager: Opening a websocket transport connection' } connection.create_websocket_transport.tap do |socket_deferrable| socket_deferrable.callback do |websocket_transport| subscribe_to_transport_events websocket_transport yield websocket_transport if block_given? end socket_deferrable.errback do |error| connection_opening_failed error end end logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" } create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, Ably::Exceptions::Codes::CONNECTION_TIMED_OUT) end end |
#suspend_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
278 279 280 281 282 283 284 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 278 def suspend_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :suspended, reason: error end end |