Class: Ably::Realtime::Connection

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum
Includes:
Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
Defined in:
lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb

Overview

Enables the management of a connection to Ably.

Defined Under Namespace

Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport

Constant Summary collapse

STATE =

The current STATE of the connection. Describes the realtime [Connection]Connection object states.

INITIALIZED A connection with this state has been initialized but no connection has yet been attempted. CONNECTING A connection attempt has been initiated. The connecting state is entered as soon as the library

has completed initialization, and is reentered each time connection is re-attempted following disconnection.

CONNECTED A connection exists and is active. DISCONNECTED A temporary failure condition. No current connection exists because there is no network connectivity

or no host is available. The disconnected state is entered if an established connection is dropped,
or if a connection attempt was unsuccessful. In the disconnected state the library will periodically
attempt to open a new connection (approximately every 15 seconds), anticipating that the connection
will be re-established soon and thus connection and channel continuity will be possible. 
In this state, developers can continue to publish messages as they are automatically placed
in a local queue, to be sent as soon as a connection is reestablished. Messages published by 
other clients while this client is disconnected will be delivered to it upon reconnection,
so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery 
is no longer possible and the connection will move to the SUSPENDED state.

SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity

or no host is available. The suspended state is entered after a failed connection attempt if 
there has then been no connection for a period of two minutes. In the suspended state, the library 
will periodically attempt to open a new connection every 30 seconds. Developers are unable to 
publish messages in this state. A new connection attempt can also be triggered by an explicit
call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established, 
channels will be automatically re-attached. The client has been disconnected for too long for them 
to resume from where they left off, so if it wants to catch up on messages published by other clients 
while it was disconnected, it needs to use the History API.

CLOSING An explicit request by the developer to close the connection has been sent to the Ably service.

If a reply is not received from Ably within a short period of time, the connection is forcibly 
terminated and the connection state becomes CLOSED.

CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts

are made automatically by the library, and clients may not publish messages. No connection state is 
preserved by the service or by the library. A new connection attempt can be triggered by an explicit
call to {Ably::Realtime::Connection#connect}, which results in a new connection.

FAILED This state is entered if the client library encounters a failure condition that it cannot recover from.

This may be a fatal connection error received from the Ably service, for example an attempt to connect
with an incorrect API key, or a local terminal error, for example the token in use has expired
and the library does not have any way to renew it. In the failed state, no reconnection attempts
are made automatically by the library, and clients may not publish messages. A new connection attempt
can be triggered by an explicit call to {Ably::Realtime::Connection#connect}.
ruby_enum('STATE',
  :initialized,
  :connecting,
  :connected,
  :disconnected,
  :suspended,
  :closing,
  :closed,
  :failed
)
EVENT =

Describes the events emitted by a Ably::Realtime::Connection object. An event is either an UPDATE or a STATE.

UPDATE RTN4h An event for changes to connection conditions for which the STATE does not change.

ruby_enum('EVENT',
  STATE.to_sym_arr + [:update]
)
RECOVER_REGEX =

Expected format for a connection recover key

/^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/
DEFAULTS =

Defaults for automatic connection recovery and timeouts

{
  channel_retry_timeout:      15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  suspended_retry_timeout:    30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  connection_state_ttl:       120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
  max_connection_state_ttl:   nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably
  realtime_request_timeout:   10,  # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
  websocket_heartbeats_disabled: false,
}.freeze
MAX_PROTOCOL_MESSAGE_BATCH_SIZE =

Max number of messages to bundle in a single ProtocolMessage

50

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_off, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client, options) ⇒ Connection

Returns a new instance of Connection.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/ably/realtime/connection.rb', line 169

def initialize(client, options)
  @client                        = client
  @__outgoing_message_queue__    = []
  @__pending_message_ack_queue__ = []

  @defaults = DEFAULTS.dup
  options.each do |key, val|
    @defaults[key] = val if DEFAULTS.has_key?(key)
  end if options.kind_of?(Hash)
  @defaults.freeze

  # If a recover client options is provided, then we need to ensure that the msgSerial matches the
  # recover serial immediately at client library instantiation. This is done immediately so that any queued
  # publishes use the correct serial number for these queued messages as well.
  # There is no harm if the msgSerial is higher than expected if the recover fails.
  recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
  if recovery_msg_serial
    @client_msg_serial = recovery_msg_serial
  else
    reset_client_msg_serial
  end

  Client::IncomingMessageDispatcher.new client, self
  Client::OutgoingMessageDispatcher.new client, self

  @state_machine = ConnectionStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ConnectionManager.new(self)

  @current_host = client.endpoint.host
end

Instance Attribute Details

#__incoming_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal incoming protocol message bus.

Returns:



397
398
399
# File 'lib/ably/realtime/connection.rb', line 397

def __incoming_protocol_msgbus__
  @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__outgoing_message_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage unsent outgoing messages. You should never interface with this array directly

Returns:

  • (Array)


155
156
157
# File 'lib/ably/realtime/connection.rb', line 155

def __outgoing_message_queue__
  @__outgoing_message_queue__
end

#__outgoing_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal outgoing protocol message bus.

Returns:



390
391
392
# File 'lib/ably/realtime/connection.rb', line 390

def __outgoing_protocol_msgbus__
  @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__pending_message_ack_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage sent messages. You should never interface with this array directly

Returns:

  • (Array)


160
161
162
# File 'lib/ably/realtime/connection.rb', line 160

def __pending_message_ack_queue__
  @__pending_message_ack_queue__
end

#clientAbly::Realtime::Client (readonly)

Ably::Realtime::Client associated with this connection



140
141
142
# File 'lib/ably/realtime/connection.rb', line 140

def client
  @client
end

#current_hostString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The current host that is configured following a call to method #determine_host.

Returns:

  • (String)

    The current host that is configured following a call to method #determine_host



424
425
426
# File 'lib/ably/realtime/connection.rb', line 424

def current_host
  @current_host
end

#defaultsHash (readonly)

Configured recovery and timeout defaults for this Ably::Realtime::Connection. See the configurable options in Ably::Realtime::Client#initialize. The defaults are immutable

Returns:

  • (Hash)


166
167
168
# File 'lib/ably/realtime/connection.rb', line 166

def defaults
  @defaults
end

#detailsAbly::Models::ConnectionDetails (readonly)

Connection details of the currently established connection



136
137
138
# File 'lib/ably/realtime/connection.rb', line 136

def details
  @details
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

An Models::ErrorInfo object describing the last error received if a connection failure occurs.



132
133
134
# File 'lib/ably/realtime/connection.rb', line 132

def error_reason
  @error_reason
end

#idString (readonly)

A unique public identifier for this connection, used to identify this member.

Returns:

  • (String)


103
104
105
# File 'lib/ably/realtime/connection.rb', line 103

def id
  @id
end

#keyString (readonly)

A unique private connection key used to recover or resume a connection, assigned by Ably. When recovering a connection explicitly, the recoveryKey is used in the recover client options as it contains both the key and the last message serial. This private connection key can also be used by other REST clients to publish on behalf of this client. See the publishing over REST on behalf of a realtime client docs for more info.

Returns:

  • (String)


114
115
116
# File 'lib/ably/realtime/connection.rb', line 114

def key
  @key
end

#loggerLogger (readonly)

Returns The Logger for this client. Configure the log_level with the ‘:log_level` option, refer to Ably::Realtime::Client#initialize.

Returns:



435
436
437
# File 'lib/ably/realtime/connection.rb', line 435

def logger
  client.logger
end

#managerAbly::Realtime::Connection::ConnectionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Connection manager responsible for creating, maintaining and closing the connection and underlying transport



150
151
152
# File 'lib/ably/realtime/connection.rb', line 150

def manager
  @manager
end

#portInteger (readonly)

Returns The default port used for this connection.

Returns:

  • (Integer)

    The default port used for this connection



428
429
430
# File 'lib/ably/realtime/connection.rb', line 428

def port
  client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
end

#serialInteger (readonly)

The serial number of the last message to be received on this connection, used automatically by the library when recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in the recover client options as it contains both the key and the last message serial.

Returns:

  • (Integer)


124
125
126
# File 'lib/ably/realtime/connection.rb', line 124

def serial
  @serial
end

#transportAbly::Realtime::Connection::WebsocketTransport (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Underlying socket transport used for this connection, for internal use by the client library



145
146
147
# File 'lib/ably/realtime/connection.rb', line 145

def transport
  @transport
end

Instance Method Details

#add_message_to_outgoing_queue(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



456
457
458
# File 'lib/ably/realtime/connection.rb', line 456

def add_message_to_outgoing_queue(protocol_message)
  __outgoing_message_queue__ << protocol_message
end

#can_publish_messages?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns false if messages cannot be published as a result of message queueing being disabled

Returns:

  • (Boolean)


567
568
569
570
# File 'lib/ably/realtime/connection.rb', line 567

def can_publish_messages?
  connected? ||
    ( (initialized? || connecting? || disconnected?) && client.queue_messages )
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



538
539
540
# File 'lib/ably/realtime/connection.rb', line 538

def clear_error_reason
  @error_reason = nil
end

#close { ... } ⇒ EventMachine::Deferrable

Causes the connection to close, entering the STATE CLOSING state. Once closed, the library does not attempt to re-establish the connection without an explicit call to #connect.

Yields:

  • block is called as soon as this connection is in the Closed state

Returns:

  • (EventMachine::Deferrable)


211
212
213
214
215
216
217
218
219
# File 'lib/ably/realtime/connection.rb', line 211

def close(&success_block)
  unless closing? || closed?
    unless can_transition_to?(:closing)
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing))
    end
    transition_state_machine :closing
  end
  deferrable_for_state_change_to(STATE.Closed, &success_block)
end

#configure_new(connection_id, connection_key, connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Following a new connection being made, the connection ID, connection key and connection serial need to match the details provided by the server.



365
366
367
368
369
370
# File 'lib/ably/realtime/connection.rb', line 365

def configure_new(connection_id, connection_key, connection_serial)
  @id            = connection_id
  @key           = connection_key

  update_connection_serial connection_serial
end

#connect { ... } ⇒ EventMachine::Deferrable

Explicitly calling connect() is unnecessary unless the autoConnect attribute of the ClientOptions object is false. Unless already connected or connecting, this method causes the connection to open, entering the STATE CONNECTING state.

Yields:

  • block is called as soon as this connection is in the Connected state

Returns:

  • (EventMachine::Deferrable)


231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/ably/realtime/connection.rb', line 231

def connect(&success_block)
  unless connecting? || connected?
    unless can_transition_to?(:connecting)
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting))
    end
    # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first
    EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) }
  end

  Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
    deferrable.callback do
      yield if block_given?
    end
    succeed_callback = deferrable.method(:succeed)
    fail_callback    = deferrable.method(:fail)

    unsafe_once(:connected) do
      deferrable.succeed
      off(&fail_callback)
    end

    unsafe_once(:failed, :closed, :closing) do
      deferrable.fail
      off(&succeed_callback)
    end
  end
end

#connection_state_ttlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



579
580
581
582
583
# File 'lib/ably/realtime/connection.rb', line 579

def connection_state_ttl
  defaults[:max_connection_state_ttl] || # undocumented max TTL configuration
    (details && details.connection_state_ttl) ||
    defaults.fetch(:connection_state_ttl)
end

#connection_state_ttl=(val) ⇒ Object



585
586
587
# File 'lib/ably/realtime/connection.rb', line 585

def connection_state_ttl=(val)
  @connection_state_ttl = val
end

#create_transport(host, port, url, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



573
574
575
576
# File 'lib/ably/realtime/connection.rb', line 573

def create_transport(host, port, url, &block)
  logger.debug { "Connection: EventMachine connecting to #{host}:#{port} with URL: #{url}" }
  EventMachine.connect(host, port, WebsocketTransport, self, url.to_s, &block)
end

#create_websocket_transportEventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (EventMachine::Deferrable)


467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/ably/realtime/connection.rb', line 467

def create_websocket_transport
  EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
    # Getting auth params can be blocking so uses a Deferrable
    client.auth.auth_params.tap do |auth_deferrable|
      auth_deferrable.callback do |auth_params|
        url_params = auth_params.merge(
          'format' =>     client.protocol,
          'echo' =>       client.echo_messages,
          'v' =>          Ably::PROTOCOL_VERSION,
          'agent' =>      client.rest_client.agent
        )

        # Use native websocket heartbeats if possible, but allow Ably protocol heartbeats
        url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled)
          'true'
        else
          'false'
        end

        url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
        url_params.merge!(client.transport_params)

        if connection_resumable?
          url_params.merge! resume: key, connection_serial: serial
          logger.debug { "Resuming connection key #{key} with serial #{serial}" }
        elsif connection_recoverable?
          url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
          logger.debug { "Recovering connection with key #{client.recover}" }
          unsafe_once(:connected, :closed, :failed) do
            client.disable_automatic_connection_recovery
          end
        end

        url = URI(client.endpoint).tap do |endpoint|
          endpoint.query = URI.encode_www_form(url_params)
        end

        determine_host do |host|
          # Ensure the hostname matches the fallback host name
          url.hostname = host
          url.port = port

          begin
            logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" }
            @transport = create_transport(host, port, url) do |websocket_transport|
              websocket_deferrable.succeed websocket_transport
            end
          rescue EventMachine::ConnectionError => error
            websocket_deferrable.fail error
          end
        end
      end

      auth_deferrable.errback do |error|
        websocket_deferrable.fail error
      end
    end
  end
end

#determine_host {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Determines the correct host name to use for the next connection attempt and updates current_host

Yields:

  • (String)

    The host name used for this connection, for network connection failures a fallback host is used to route around networking or intermittent problems if an Internet connection is available

Raises:

  • (ArgumentError)


404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/ably/realtime/connection.rb', line 404

def determine_host
  raise ArgumentError, 'Block required' unless block_given?

  if should_use_fallback_hosts?
    internet_up? do |internet_is_up_result|
      @current_host = if internet_is_up_result
        client.fallback_endpoint.host
      else
        client.endpoint.host
      end
      yield current_host
    end
  else
    @current_host = client.endpoint.host
    yield current_host
  end
end

#heartbeat_intervalObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



590
591
592
593
594
# File 'lib/ably/realtime/connection.rb', line 590

def heartbeat_interval
  # See RTN23a
  (details && details.max_idle_interval).to_i +
    defaults.fetch(:realtime_request_timeout)
end

#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields:

  • (Boolean)

    True if an internet connection check appears to be up following an HTTP request to a reliable CDN

Returns:

  • (EventMachine::Deferrable)


327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/ably/realtime/connection.rb', line 327

def internet_up?
  url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
  EventMachine::DefaultDeferrable.new.tap do |deferrable|
    EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http|
      http.errback do
        yield false if block_given?
        deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED)
      end
      http.callback do
        EventMachine.next_tick do
          result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
          yield result if block_given?
          if result
            deferrable.succeed
          else
            deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, Ably::Exceptions::Codes::BAD_REQUEST)
          end
        end
      end
    end
  end
end

#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



461
462
463
# File 'lib/ably/realtime/connection.rb', line 461

def notify_message_dispatcher_of_new_message(protocol_message)
  __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
end

#off_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove a registered connection resume callback



561
562
563
# File 'lib/ably/realtime/connection.rb', line 561

def off_resume(&callback)
  resume_callbacks.delete(callback)
end

#on_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides a simple hook to inject a callback when a connection is successfully resumed



555
556
557
# File 'lib/ably/realtime/connection.rb', line 555

def on_resume(&callback)
  resume_callbacks << callback
end

#ping {|Integer| ... } ⇒ Ably::Util::SafeDeferrable

When connected, sends a heartbeat ping to the Ably server and executes the callback with any error and the response time in milliseconds when a heartbeat ping request is echoed from the server. This can be useful for measuring true round-trip latency to the connected Ably server.

Examples:

client = Ably::Rest::Client.new(key: 'key.id:secret')
client.connection.ping do |elapsed_s|
  puts "Ping took #{elapsed_s}s"
end

Yields:

  • (Integer)

    if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in seconds. If the ping is not received within an acceptable timeframe, the block will be called with nil as he first argument

Returns:



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/ably/realtime/connection.rb', line 276

def ping(&block)
  if initialized? || suspended? || closing? || closed? || failed?
    error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
    started = nil
    finished = false
    ping_id = SecureRandom.hex(16)
    heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat

    wait_for_ping = lambda do |protocol_message|
      next if finished
      if protocol_message.action == heartbeat_action && protocol_message.id == ping_id
        finished = true
        __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
        time_passed = Time.now.to_f - started.to_f
        deferrable.succeed time_passed
        safe_yield block, time_passed if block_given?
      end
    end

    once_or_if(STATE.Connected) do
      next if finished
      started = Time.now
      send_protocol_message action: heartbeat_action.to_i, id: ping_id
      __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
    end

    once_or_if([:suspended, :closing, :closed, :failed]) do
      next if finished
      finished = true
      deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
    end

    EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
      next if finished
      finished = true
      __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
      error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
      logger.warn { error_msg }
      deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: Ably::Exceptions::Codes::TIMEOUT_ERROR)
      safe_yield block, nil if block_given?
    end
  end
end

#recovery_keyString

The recovery key string can be used by another client to recover this connection’s state in the recover client options property. See connection state recover options for more information.

Returns:

  • (String)


356
357
358
# File 'lib/ably/realtime/connection.rb', line 356

def recovery_key
  "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
end

#release_websocket_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



528
529
530
# File 'lib/ably/realtime/connection.rb', line 528

def release_websocket_transport
  @transport = nil
end

#reset_client_msg_serialObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Resets the client message serial (msgSerial) sent to Ably for each new Models::ProtocolMessage (see #client_msg_serial)



599
600
601
# File 'lib/ably/realtime/connection.rb', line 599

def reset_client_msg_serial
  @client_msg_serial = -1
end

#reset_resume_infovoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Disable automatic resume of a connection



382
383
384
385
# File 'lib/ably/realtime/connection.rb', line 382

def reset_resume_info
  @key    = nil
  @serial = nil
end

#send_protocol_message(protocol_message) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent

Parameters:



445
446
447
448
449
450
451
452
453
# File 'lib/ably/realtime/connection.rb', line 445

def send_protocol_message(protocol_message)
  add_message_serial_if_ack_required_to(protocol_message) do
    Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
      add_message_to_outgoing_queue message
      notify_message_dispatcher_of_new_message message
      logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
    end
  end
end

#set_connection_confirmed_aliveObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a hearbeat or any other message from Ably is received we know it’s alive, see #RTN23



606
607
608
609
# File 'lib/ably/realtime/connection.rb', line 606

def set_connection_confirmed_alive
  @last_liveness_event = Time.now
  manager.reset_liveness_timer
end

#set_connection_details(connection_details) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



543
544
545
# File 'lib/ably/realtime/connection.rb', line 543

def set_connection_details(connection_details)
  @details = connection_details
end

#set_failed_connection_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



533
534
535
# File 'lib/ably/realtime/connection.rb', line 533

def set_failed_connection_error_reason(error)
  @error_reason = error
end

#time_since_connection_confirmed_alive?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


612
613
614
# File 'lib/ably/realtime/connection.rb', line 612

def time_since_connection_confirmed_alive?
  Time.now.to_i - @last_liveness_event.to_i
end

#trigger_resumedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes registered callbacks for a successful connection resume event



549
550
551
# File 'lib/ably/realtime/connection.rb', line 549

def trigger_resumed
  resume_callbacks.each(&:call)
end

#update_connection_serial(connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Store last received connection serial so that the connection can be resumed from the last known point-in-time



375
376
377
# File 'lib/ably/realtime/connection.rb', line 375

def update_connection_serial(connection_serial)
  @serial = connection_serial
end