Class: Ably::Realtime::Connection

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum
Includes:
Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
Defined in:
lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb

Overview

The Connection class represents the connection associated with an Ably Realtime instance. The Connection object exposes the lifecycle and parameters of the realtime connection.

Connections will always be in one of the following states:

initialized:  0
connecting:   1
connected:    2
disconnected: 3
suspended:    4
closing:      5
closed:       6
failed:       7

Note that the states are available as Enum-like constants:

Connection::STATE.Initialized
Connection::STATE.Connecting
Connection::STATE.Connected
Connection::STATE.Disconnected
Connection::STATE.Suspended
Connection::STATE.Closing
Connection::STATE.Closed
Connection::STATE.Failed

Connection emit errors - use ‘on(:error)` to subscribe to errors

Examples:

client = Ably::Realtime::Client.new('key.id:secret')
client.connection.on(:connected) do
  puts "Connected with connection ID: #{client.connection.id}"
end

Defined Under Namespace

Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport

Constant Summary collapse

STATE =

Valid Connection states

ruby_enum('STATE',
  :initialized,
  :connecting,
  :connected,
  :disconnected,
  :suspended,
  :closing,
  :closed,
  :failed
)
RECOVER_REGEX =

Expected format for a connection recover key

/^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client) ⇒ Connection

Returns a new instance of Connection.



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ably/realtime/connection.rb', line 103

def initialize(client)
  @client                        = client
  @client_serial                 = -1
  @__outgoing_message_queue__    = []
  @__pending_message_ack_queue__ = []

  Client::IncomingMessageDispatcher.new client, self
  Client::OutgoingMessageDispatcher.new client, self

  @state_machine = ConnectionStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ConnectionManager.new(self)
end

Instance Attribute Details

#__incoming_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal incoming protocol message bus.

Returns:



246
247
248
# File 'lib/ably/realtime/connection.rb', line 246

def __incoming_protocol_msgbus__
  @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__outgoing_message_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage unsent outgoing messages. You should never interface with this array directly

Returns:

  • (Array)


95
96
97
# File 'lib/ably/realtime/connection.rb', line 95

def __outgoing_message_queue__
  @__outgoing_message_queue__
end

#__outgoing_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal outgoing protocol message bus.

Returns:



239
240
241
# File 'lib/ably/realtime/connection.rb', line 239

def __outgoing_protocol_msgbus__
  @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__pending_message_ack_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage sent messages. You should never interface with this array directly

Returns:

  • (Array)


100
101
102
# File 'lib/ably/realtime/connection.rb', line 100

def __pending_message_ack_queue__
  @__pending_message_ack_queue__
end

#clientAbly::Realtime::Client (readonly)

Ably::Realtime::Client associated with this connection



80
81
82
# File 'lib/ably/realtime/connection.rb', line 80

def client
  @client
end

#current_hostString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The current host that is configured following a call to method #determine_host.

Returns:

  • (String)

    The current host that is configured following a call to method #determine_host



273
274
275
# File 'lib/ably/realtime/connection.rb', line 273

def current_host
  @current_host
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

When a connection failure occurs this attribute contains the Ably Exception



76
77
78
# File 'lib/ably/realtime/connection.rb', line 76

def error_reason
  @error_reason
end

#idString (readonly)

A unique public identifier for this connection, used to identify this member in presence events and messages

Returns:

  • (String)


64
65
66
# File 'lib/ably/realtime/connection.rb', line 64

def id
  @id
end

#keyString (readonly)

A unique private connection key used to recover this connection, assigned by Ably

Returns:

  • (String)


68
69
70
# File 'lib/ably/realtime/connection.rb', line 68

def key
  @key
end

#loggerLogger (readonly)

Returns The Logger for this client. Configure the log_level with the :log_level option, refer to Ably::Realtime::Client#initialize.

Returns:



284
285
286
# File 'lib/ably/realtime/connection.rb', line 284

def logger
  client.logger
end

#managerAbly::Realtime::Connection::ConnectionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Connection manager responsible for creating, maintaining and closing the connection and underlying transport



90
91
92
# File 'lib/ably/realtime/connection.rb', line 90

def manager
  @manager
end

#portInteger (readonly)

Returns The default port used for this connection.

Returns:

  • (Integer)

    The default port used for this connection



277
278
279
# File 'lib/ably/realtime/connection.rb', line 277

def port
  client.use_tls? ? 443 : 80
end

#recovery_keyObject (readonly)



204
205
206
# File 'lib/ably/realtime/connection.rb', line 204

def recovery_key
  "#{key}:#{serial}" if connection_resumable?
end

#serialInteger (readonly)

The serial number of the last message to be received on this connection, used to recover or resume a connection

Returns:

  • (Integer)


72
73
74
# File 'lib/ably/realtime/connection.rb', line 72

def serial
  @serial
end

#stateAbly::Realtime::Connection::STATE (readonly)

Returns connection state.

Returns:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/ably/realtime/connection.rb', line 39

class Connection
  include Ably::Modules::EventEmitter
  include Ably::Modules::Conversions
  include Ably::Modules::SafeYield
  extend Ably::Modules::Enum

  # Valid Connection states
  STATE = ruby_enum('STATE',
    :initialized,
    :connecting,
    :connected,
    :disconnected,
    :suspended,
    :closing,
    :closed,
    :failed
  )
  include Ably::Modules::StateEmitter
  include Ably::Modules::UsesStateMachine

  # Expected format for a connection recover key
  RECOVER_REGEX = /^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/

  # A unique public identifier for this connection, used to identify this member in presence events and messages
  # @return [String]
  attr_reader :id

  # A unique private connection key used to recover this connection, assigned by Ably
  # @return [String]
  attr_reader :key

  # The serial number of the last message to be received on this connection, used to recover or resume a connection
  # @return [Integer]
  attr_reader :serial

  # When a connection failure occurs this attribute contains the Ably Exception
  # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
  attr_reader :error_reason

  # {Ably::Realtime::Client} associated with this connection
  # @return [Ably::Realtime::Client]
  attr_reader :client

  # Underlying socket transport used for this connection, for internal use by the client library
  # @return [Ably::Realtime::Connection::WebsocketTransport]
  # @api private
  attr_reader :transport

  # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
  # @return [Ably::Realtime::Connection::ConnectionManager]
  # @api private
  attr_reader :manager

  # An internal queue used to manage unsent outgoing messages.  You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__outgoing_message_queue__

  # An internal queue used to manage sent messages.  You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__pending_message_ack_queue__

  # @api public
  def initialize(client)
    @client                        = client
    @client_serial                 = -1
    @__outgoing_message_queue__    = []
    @__pending_message_ack_queue__ = []

    Client::IncomingMessageDispatcher.new client, self
    Client::OutgoingMessageDispatcher.new client, self

    @state_machine = ConnectionStateMachine.new(self)
    @state         = STATE(state_machine.current_state)
    @manager       = ConnectionManager.new(self)
  end

  # Causes the connection to close, entering the closed state, from any state except
  # the failed state. Once closed, the library will not attempt to re-establish the
  # connection without a call to {Connection#connect}.
  #
  # @yield [Ably::Realtime::Connection] block is called as soon as this connection is in the Closed state
  #
  # @return [EventMachine::Deferrable]
  #
  def close(&success_block)
    unless closing? || closed?
      raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing)
      transition_state_machine :closing
    end
    deferrable_for_state_change_to(STATE.Closed, &success_block)
  end

  # Causes the library to attempt connection.  If it was previously explicitly
  # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.
  #
  # @yield [Ably::Realtime::Connection] block is called as soon as this connection is in the Connected state
  #
  # @return [EventMachine::Deferrable]
  #
  def connect(&success_block)
    unless connecting? || connected?
      raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
      transition_state_machine :connecting
    end
    deferrable_for_state_change_to(STATE.Connected, &success_block)
  end

  # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
  # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently.
  # The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state.
  #
  # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds
  #
  # @example
  #    client = Ably::Rest::Client.new(key: 'key.id:secret')
  #    client.connection.ping do |ms_elapsed|
  #      puts "Ping took #{ms_elapsed}ms"
  #    end
  #
  # @return [void]
  #
  def ping(&block)
    raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
    raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?

    started = nil

    wait_for_ping = Proc.new do |protocol_message|
      if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
        __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
        time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
        safe_yield block, time_passed if block_given?
      end
    end

    once_or_if(STATE.Connected) do
      started = Time.now
      send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
      __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
    end
  end

  # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
  # @return [EventMachine::Deferrable]
  # @api private
  def internet_up?
    EventMachine::DefaultDeferrable.new.tap do |deferrable|
      EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http|
        http.errback do
          yield false if block_given?
          deferrable.fail
        end
        http.callback do
          result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
          yield result if block_given?
          deferrable.succeed
        end
      end
    end
  end

  # @!attribute [r] recovery_key
  #   @return [String] recovery key that can be used by another client to recover this connection with the :recover option
  def recovery_key
    "#{key}:#{serial}" if connection_resumable?
  end

  # Following a new connection being made, the connection ID, connection key
  # and message serial need to match the details provided by the server.
  #
  # @return [void]
  # @api private
  def configure_new(connection_id, connection_key, connection_serial)
    @id            = connection_id
    @key           = connection_key
    @client_serial = connection_serial

    update_connection_serial connection_serial
  end

  # Store last received connection serial so that the connection can be resumed from the last known point-in-time
  # @return [void]
  # @api private
  def update_connection_serial(connection_serial)
    @serial = connection_serial
  end

  # Disable automatic resume of a connection
  # @return [void]
  # @api private
  def reset_resume_info
    @key    = nil
    @serial = nil
  end

  # @!attribute [r] __outgoing_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus
  # @api private
  def __outgoing_protocol_msgbus__
    @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # @!attribute [r] __incoming_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal incoming protocol message bus
  # @api private
  def __incoming_protocol_msgbus__
    @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # Determines the correct host name to use for the next connection attempt and updates current_host
  # @yield [String] The host name used for this connection, for network connection failures a {Ably::FALLBACK_HOSTS fallback host} is used to route around networking or intermittent problems if an Internet connection is available
  # @api private
  def determine_host
    raise ArgumentError, 'Block required' unless block_given?

    if can_use_fallback_hosts?
      internet_up? do |internet_is_up_result|
        @current_host = if internet_is_up_result
          client.fallback_endpoint.host
        else
          client.endpoint.host
        end
        yield current_host
      end
    else
      @current_host = client.endpoint.host
      yield current_host
    end
  end

  # @return [String] The current host that is configured following a call to method {#determine_host}
  # @api private
  attr_reader :current_host

  # @!attribute [r] port
  # @return [Integer] The default port used for this connection
  def port
    client.use_tls? ? 443 : 80
  end

  # @!attribute [r] logger
  # @return [Logger] The {Ably::Logger} for this client.
  #                  Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize}
  def logger
    client.logger
  end

  # Add protocol message to the outgoing message queue and notify the dispatcher that a message is
  # ready to be sent
  #
  # @param [Ably::Models::ProtocolMessage] protocol_message
  # @return [void]
  # @api private
  def send_protocol_message(protocol_message)
    add_message_serial_if_ack_required_to(protocol_message) do
      Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |protocol_message|
        add_message_to_outgoing_queue protocol_message
        notify_message_dispatcher_of_new_message protocol_message
        logger.debug("Connection: Prot msg queued =>: #{protocol_message.action} #{protocol_message}")
      end
    end
  end

  # @api private
  def add_message_to_outgoing_queue(protocol_message)
    __outgoing_message_queue__ << protocol_message
  end

  # @api private
  def notify_message_dispatcher_of_new_message(protocol_message)
    __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
  end

  # @api private
  def create_websocket_transport
    raise ArgumentError, 'Block required' unless block_given?

    blocking_operation = proc do
      URI(client.endpoint).tap do |endpoint|
        url_params = client.auth.auth_params.merge(
          timestamp: as_since_epoch(Time.now),
          format:    client.protocol,
          echo:      client.echo_messages
        )

        if connection_resumable?
          url_params.merge! resume: key, connection_serial: serial
          logger.debug "Resuming connection key #{key} with serial #{serial}"
        elsif connection_recoverable?
          url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
          logger.debug "Recovering connection with key #{client.recover}"
          once(:connected, :closed, :failed) do
            client.disable_automatic_connection_recovery
          end
        end

        endpoint.query = URI.encode_www_form(url_params)
      end.to_s
    end

    callback = proc do |url|
      determine_host do |host|
        begin
          logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
          @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
            yield websocket_transport if block_given?
          end
        rescue EventMachine::ConnectionError => error
          manager.connection_opening_failed error
        end
      end
    end

    # client.auth.auth_params is a blocking call, so defer this into a thread
    EventMachine.defer blocking_operation, callback
  end

  # @api private
  def release_websocket_transport
    @transport = nil
  end

  # @api private
  def set_failed_connection_error_reason(error)
    @error_reason = error
  end

  # @api private
  def clear_error_reason
    @error_reason = nil
  end

  # Executes registered callbacks for a successful connection resume event
  # @api private
  def resumed
    resume_callbacks.each(&:call)
  end

  # Provides a simple hook to inject a callback when a connection is successfully resumed
  # @api private
  def on_resume(&callback)
    resume_callbacks << callback
  end

  # Remove a registered connection resume callback
  # @api private
  def off_resume(&callback)
    resume_callbacks.delete(callback)
  end

  # As we are using a state machine, do not allow change_state to be used
  # #transition_state_machine must be used instead
  private :change_state

  private

  # The client serial is incremented for every message that is published that requires an ACK.
  # Note that this is different to the connection serial that contains the last known serial number
  # received from the server.
  #
  # A client serial number therefore does not guarantee a message has been received, only sent.
  # A connection serial guarantees the server has received the message and is thus used for connection
  # recovery and resumes.
  # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
  attr_reader :client_serial

  def resume_callbacks
    @resume_callbacks ||= []
  end

  def create_pub_sub_message_bus
    Ably::Util::PubSub.new(
      coerce_into: Proc.new do |event|
        raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
        :protocol_message
      end
    )
  end

  def add_message_serial_if_ack_required_to(protocol_message)
    if Ably::Models::ProtocolMessage.ack_required?(protocol_message[:action])
      add_message_serial_to(protocol_message) { yield }
    else
      yield
    end
  end

  def add_message_serial_to(protocol_message)
    @client_serial += 1
    protocol_message[:msgSerial] = client_serial
    yield
  rescue StandardError => e
    @client_serial -= 1
    raise e
  end

  # Simply wait until the next EventMachine tick to ensure Connection initialization is complete
  def when_initialized
    EventMachine.next_tick { yield }
  end

  def connection_resumable?
    !key.nil? && !serial.nil?
  end

  def connection_recoverable?
    connection_recover_parts
  end

  def connection_recover_parts
    client.recover.to_s.match(RECOVER_REGEX)
  end

  def can_use_fallback_hosts?
    if client.environment.nil? && client.custom_realtime_host.nil?
      if connecting? && previous_state
        use_fallback_if_disconnected? || use_fallback_if_suspended?
      end
    end
  end

  def use_fallback_if_disconnected?
    second_reconnect_attempt_for(:disconnected, 1)
  end

  def use_fallback_if_suspended?
    second_reconnect_attempt_for(:suspended, 2) # on first suspended state use default Ably host again
  end

  def second_reconnect_attempt_for(state, first_attempt_count)
    previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count
  end
end

#transportAbly::Realtime::Connection::WebsocketTransport (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Underlying socket transport used for this connection, for internal use by the client library



85
86
87
# File 'lib/ably/realtime/connection.rb', line 85

def transport
  @transport
end

Instance Method Details

#add_message_to_outgoing_queue(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



305
306
307
# File 'lib/ably/realtime/connection.rb', line 305

def add_message_to_outgoing_queue(protocol_message)
  __outgoing_message_queue__ << protocol_message
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



369
370
371
# File 'lib/ably/realtime/connection.rb', line 369

def clear_error_reason
  @error_reason = nil
end

#close {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable

Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to #connect.

Yields:

Returns:

  • (EventMachine::Deferrable)


125
126
127
128
129
130
131
# File 'lib/ably/realtime/connection.rb', line 125

def close(&success_block)
  unless closing? || closed?
    raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing)
    transition_state_machine :closing
  end
  deferrable_for_state_change_to(STATE.Closed, &success_block)
end

#configure_new(connection_id, connection_key, connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.



213
214
215
216
217
218
219
# File 'lib/ably/realtime/connection.rb', line 213

def configure_new(connection_id, connection_key, connection_serial)
  @id            = connection_id
  @key           = connection_key
  @client_serial = connection_serial

  update_connection_serial connection_serial
end

#connect {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable

Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.

Yields:

Returns:

  • (EventMachine::Deferrable)


140
141
142
143
144
145
146
# File 'lib/ably/realtime/connection.rb', line 140

def connect(&success_block)
  unless connecting? || connected?
    raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
    transition_state_machine :connecting
  end
  deferrable_for_state_change_to(STATE.Connected, &success_block)
end

#create_websocket_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:

  • (ArgumentError)


315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/ably/realtime/connection.rb', line 315

def create_websocket_transport
  raise ArgumentError, 'Block required' unless block_given?

  blocking_operation = proc do
    URI(client.endpoint).tap do |endpoint|
      url_params = client.auth.auth_params.merge(
        timestamp: as_since_epoch(Time.now),
        format:    client.protocol,
        echo:      client.echo_messages
      )

      if connection_resumable?
        url_params.merge! resume: key, connection_serial: serial
        logger.debug "Resuming connection key #{key} with serial #{serial}"
      elsif connection_recoverable?
        url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
        logger.debug "Recovering connection with key #{client.recover}"
        once(:connected, :closed, :failed) do
          client.disable_automatic_connection_recovery
        end
      end

      endpoint.query = URI.encode_www_form(url_params)
    end.to_s
  end

  callback = proc do |url|
    determine_host do |host|
      begin
        logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
        @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
          yield websocket_transport if block_given?
        end
      rescue EventMachine::ConnectionError => error
        manager.connection_opening_failed error
      end
    end
  end

  # client.auth.auth_params is a blocking call, so defer this into a thread
  EventMachine.defer blocking_operation, callback
end

#determine_host {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Determines the correct host name to use for the next connection attempt and updates current_host

Yields:

  • (String)

    The host name used for this connection, for network connection failures a fallback host is used to route around networking or intermittent problems if an Internet connection is available

Raises:

  • (ArgumentError)


253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/ably/realtime/connection.rb', line 253

def determine_host
  raise ArgumentError, 'Block required' unless block_given?

  if can_use_fallback_hosts?
    internet_up? do |internet_is_up_result|
      @current_host = if internet_is_up_result
        client.fallback_endpoint.host
      else
        client.endpoint.host
      end
      yield current_host
    end
  else
    @current_host = client.endpoint.host
    yield current_host
  end
end

#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields:

  • (Boolean)

    True if an internet connection check appears to be up following an HTTP request to a reliable CDN

Returns:

  • (EventMachine::Deferrable)


186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/ably/realtime/connection.rb', line 186

def internet_up?
  EventMachine::DefaultDeferrable.new.tap do |deferrable|
    EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http|
      http.errback do
        yield false if block_given?
        deferrable.fail
      end
      http.callback do
        result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
        yield result if block_given?
        deferrable.succeed
      end
    end
  end
end

#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



310
311
312
# File 'lib/ably/realtime/connection.rb', line 310

def notify_message_dispatcher_of_new_message(protocol_message)
  __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
end

#off_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove a registered connection resume callback



387
388
389
# File 'lib/ably/realtime/connection.rb', line 387

def off_resume(&callback)
  resume_callbacks.delete(callback)
end

#on_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides a simple hook to inject a callback when a connection is successfully resumed



381
382
383
# File 'lib/ably/realtime/connection.rb', line 381

def on_resume(&callback)
  resume_callbacks << callback
end

#ping {|Integer| ... } ⇒ void

This method returns an undefined value.

Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the :connected state.

Examples:

client = Ably::Rest::Client.new(key: 'key.id:secret')
client.connection.ping do |ms_elapsed|
  puts "Ping took #{ms_elapsed}ms"
end

Yields:

  • (Integer)

    if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds

Raises:

  • (RuntimeError)


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/ably/realtime/connection.rb', line 162

def ping(&block)
  raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
  raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?

  started = nil

  wait_for_ping = Proc.new do |protocol_message|
    if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
      __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
      time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
      safe_yield block, time_passed if block_given?
    end
  end

  once_or_if(STATE.Connected) do
    started = Time.now
    send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
    __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
  end
end

#release_websocket_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



359
360
361
# File 'lib/ably/realtime/connection.rb', line 359

def release_websocket_transport
  @transport = nil
end

#reset_resume_infovoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Disable automatic resume of a connection



231
232
233
234
# File 'lib/ably/realtime/connection.rb', line 231

def reset_resume_info
  @key    = nil
  @serial = nil
end

#resumedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes registered callbacks for a successful connection resume event



375
376
377
# File 'lib/ably/realtime/connection.rb', line 375

def resumed
  resume_callbacks.each(&:call)
end

#send_protocol_message(protocol_message) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent

Parameters:



294
295
296
297
298
299
300
301
302
# File 'lib/ably/realtime/connection.rb', line 294

def send_protocol_message(protocol_message)
  add_message_serial_if_ack_required_to(protocol_message) do
    Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |protocol_message|
      add_message_to_outgoing_queue protocol_message
      notify_message_dispatcher_of_new_message protocol_message
      logger.debug("Connection: Prot msg queued =>: #{protocol_message.action} #{protocol_message}")
    end
  end
end

#set_failed_connection_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



364
365
366
# File 'lib/ably/realtime/connection.rb', line 364

def set_failed_connection_error_reason(error)
  @error_reason = error
end

#update_connection_serial(connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Store last received connection serial so that the connection can be resumed from the last known point-in-time



224
225
226
# File 'lib/ably/realtime/connection.rb', line 224

def update_connection_serial(connection_serial)
  @serial = connection_serial
end