Class: Ably::Realtime::Connection
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection
- Extended by:
- Modules::Enum
- Includes:
- Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
- Defined in:
- lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb
Overview
The Connection class represents the connection associated with an Ably Realtime instance. The Connection object exposes the lifecycle and parameters of the realtime connection.
Connections will always be in one of the following states:
initialized: 0
connecting: 1
connected: 2
disconnected: 3
suspended: 4
closing: 5
closed: 6
failed: 7
Note that the states are available as Enum-like constants:
Connection::STATE.Initialized
Connection::STATE.Connecting
Connection::STATE.Connected
Connection::STATE.Disconnected
Connection::STATE.Suspended
Connection::STATE.Closing
Connection::STATE.Closed
Connection::STATE.Failed
Connection emit errors - use ‘on(:error)` to subscribe to errors
Defined Under Namespace
Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport
Constant Summary collapse
- STATE =
Valid Connection states
ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed )
- RECOVER_REGEX =
Expected format for a connection recover key
/^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/
Instance Attribute Summary collapse
-
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal incoming protocol message bus.
-
#__outgoing_message_queue__ ⇒ Array
readonly
private
An internal queue used to manage unsent outgoing messages.
-
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal outgoing protocol message bus.
-
#__pending_message_ack_queue__ ⇒ Array
readonly
private
An internal queue used to manage sent messages.
-
#client ⇒ Ably::Realtime::Client
readonly
Client associated with this connection.
-
#current_host ⇒ String
readonly
private
The current host that is configured following a call to method #determine_host.
-
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException
readonly
When a connection failure occurs this attribute contains the Ably Exception.
-
#id ⇒ String
readonly
A unique public identifier for this connection, used to identify this member in presence events and messages.
-
#key ⇒ String
readonly
A unique private connection key used to recover this connection, assigned by Ably.
-
#logger ⇒ Logger
readonly
The Logger for this client.
-
#manager ⇒ Ably::Realtime::Connection::ConnectionManager
readonly
private
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport.
-
#port ⇒ Integer
readonly
The default port used for this connection.
- #recovery_key ⇒ Object readonly
-
#serial ⇒ Integer
readonly
The serial number of the last message to be received on this connection, used to recover or resume a connection.
-
#state ⇒ Ably::Realtime::Connection::STATE
readonly
Connection state.
-
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport
readonly
private
Underlying socket transport used for this connection, for internal use by the client library.
Attributes included from Modules::UsesStateMachine
#previous_state, #state_history
Instance Method Summary collapse
- #add_message_to_outgoing_queue(protocol_message) ⇒ Object private
- #clear_error_reason ⇒ Object private
-
#close {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable
Causes the connection to close, entering the closed state, from any state except the failed state.
-
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
private
Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.
-
#connect {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable
Causes the library to attempt connection.
- #create_websocket_transport ⇒ Object private
-
#determine_host {|String| ... } ⇒ Object
private
Determines the correct host name to use for the next connection attempt and updates current_host.
-
#initialize(client) ⇒ Connection
constructor
A new instance of Connection.
- #internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable private
- #notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object private
-
#off_resume(&callback) ⇒ Object
private
Remove a registered connection resume callback.
-
#on_resume(&callback) ⇒ Object
private
Provides a simple hook to inject a callback when a connection is successfully resumed.
-
#ping {|Integer| ... } ⇒ void
Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
- #release_websocket_transport ⇒ Object private
-
#reset_resume_info ⇒ void
private
Disable automatic resume of a connection.
-
#resumed ⇒ Object
private
Executes registered callbacks for a successful connection resume event.
-
#send_protocol_message(protocol_message) ⇒ void
private
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent.
- #set_failed_connection_error_reason(error) ⇒ Object private
-
#update_connection_serial(connection_serial) ⇒ void
private
Store last received connection serial so that the connection can be resumed from the last known point-in-time.
Methods included from Modules::UsesStateMachine
#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!
Methods included from Modules::StateEmitter
#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed
Methods included from Modules::EventEmitter
#emit, #off, #on, #once, #unsafe_on, #unsafe_once
Constructor Details
#initialize(client) ⇒ Connection
Returns a new instance of Connection.
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ably/realtime/connection.rb', line 103 def initialize(client) @client = client @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) end |
Instance Attribute Details
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal incoming protocol message bus.
246 247 248 |
# File 'lib/ably/realtime/connection.rb', line 246 def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= end |
#__outgoing_message_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
95 96 97 |
# File 'lib/ably/realtime/connection.rb', line 95 def @__outgoing_message_queue__ end |
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal outgoing protocol message bus.
239 240 241 |
# File 'lib/ably/realtime/connection.rb', line 239 def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= end |
#__pending_message_ack_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage sent messages. You should never interface with this array directly
100 101 102 |
# File 'lib/ably/realtime/connection.rb', line 100 def @__pending_message_ack_queue__ end |
#client ⇒ Ably::Realtime::Client (readonly)
Ably::Realtime::Client associated with this connection
80 81 82 |
# File 'lib/ably/realtime/connection.rb', line 80 def client @client end |
#current_host ⇒ String (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The current host that is configured following a call to method #determine_host.
273 274 275 |
# File 'lib/ably/realtime/connection.rb', line 273 def current_host @current_host end |
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)
When a connection failure occurs this attribute contains the Ably Exception
76 77 78 |
# File 'lib/ably/realtime/connection.rb', line 76 def error_reason @error_reason end |
#id ⇒ String (readonly)
A unique public identifier for this connection, used to identify this member in presence events and messages
64 65 66 |
# File 'lib/ably/realtime/connection.rb', line 64 def id @id end |
#key ⇒ String (readonly)
A unique private connection key used to recover this connection, assigned by Ably
68 69 70 |
# File 'lib/ably/realtime/connection.rb', line 68 def key @key end |
#logger ⇒ Logger (readonly)
Returns The Logger for this client. Configure the log_level with the :log_level option, refer to Ably::Realtime::Client#initialize.
284 285 286 |
# File 'lib/ably/realtime/connection.rb', line 284 def logger client.logger end |
#manager ⇒ Ably::Realtime::Connection::ConnectionManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
90 91 92 |
# File 'lib/ably/realtime/connection.rb', line 90 def manager @manager end |
#port ⇒ Integer (readonly)
Returns The default port used for this connection.
277 278 279 |
# File 'lib/ably/realtime/connection.rb', line 277 def port client.use_tls? ? 443 : 80 end |
#recovery_key ⇒ Object (readonly)
204 205 206 |
# File 'lib/ably/realtime/connection.rb', line 204 def recovery_key "#{key}:#{serial}" if connection_resumable? end |
#serial ⇒ Integer (readonly)
The serial number of the last message to be received on this connection, used to recover or resume a connection
72 73 74 |
# File 'lib/ably/realtime/connection.rb', line 72 def serial @serial end |
#state ⇒ Ably::Realtime::Connection::STATE (readonly)
Returns connection state.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/ably/realtime/connection.rb', line 39 class Connection include Ably::Modules::EventEmitter include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum # Valid Connection states STATE = ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed ) include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine # Expected format for a connection recover key RECOVER_REGEX = /^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/ # A unique public identifier for this connection, used to identify this member in presence events and messages # @return [String] attr_reader :id # A unique private connection key used to recover this connection, assigned by Ably # @return [String] attr_reader :key # The serial number of the last message to be received on this connection, used to recover or resume a connection # @return [Integer] attr_reader :serial # When a connection failure occurs this attribute contains the Ably Exception # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] attr_reader :error_reason # {Ably::Realtime::Client} associated with this connection # @return [Ably::Realtime::Client] attr_reader :client # Underlying socket transport used for this connection, for internal use by the client library # @return [Ably::Realtime::Connection::WebsocketTransport] # @api private attr_reader :transport # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport # @return [Ably::Realtime::Connection::ConnectionManager] # @api private attr_reader :manager # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__outgoing_message_queue__ # An internal queue used to manage sent messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__pending_message_ack_queue__ # @api public def initialize(client) @client = client @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) end # Causes the connection to close, entering the closed state, from any state except # the failed state. Once closed, the library will not attempt to re-establish the # connection without a call to {Connection#connect}. # # @yield [Ably::Realtime::Connection] block is called as soon as this connection is in the Closed state # # @return [EventMachine::Deferrable] # def close(&success_block) unless closing? || closed? raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing) transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end # Causes the library to attempt connection. If it was previously explicitly # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. # # @yield [Ably::Realtime::Connection] block is called as soon as this connection is in the Connected state # # @return [EventMachine::Deferrable] # def connect(&success_block) unless connecting? || connected? raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) transition_state_machine :connecting end deferrable_for_state_change_to(STATE.Connected, &success_block) end # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. # The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state. # # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds # # @example # client = Ably::Rest::Client.new(key: 'key.id:secret') # client.connection.ping do |ms_elapsed| # puts "Ping took #{ms_elapsed}ms" # end # # @return [void] # def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil wait_for_ping = Proc.new do || if .action == Ably::Models::ProtocolMessage::ACTION.Heartbeat __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do started = Time.now action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end end # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN # @return [EventMachine::Deferrable] # @api private def internet_up? EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http| http.errback do yield false if block_given? deferrable.fail end http.callback do result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) yield result if block_given? deferrable.succeed end end end end # @!attribute [r] recovery_key # @return [String] recovery key that can be used by another client to recover this connection with the :recover option def recovery_key "#{key}:#{serial}" if connection_resumable? end # Following a new connection being made, the connection ID, connection key # and message serial need to match the details provided by the server. # # @return [void] # @api private def configure_new(connection_id, connection_key, connection_serial) @id = connection_id @key = connection_key @client_serial = connection_serial update_connection_serial connection_serial end # Store last received connection serial so that the connection can be resumed from the last known point-in-time # @return [void] # @api private def update_connection_serial(connection_serial) @serial = connection_serial end # Disable automatic resume of a connection # @return [void] # @api private def reset_resume_info @key = nil @serial = nil end # @!attribute [r] __outgoing_protocol_msgbus__ # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus # @api private def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= end # @!attribute [r] __incoming_protocol_msgbus__ # @return [Ably::Util::PubSub] Client library internal incoming protocol message bus # @api private def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= end # Determines the correct host name to use for the next connection attempt and updates current_host # @yield [String] The host name used for this connection, for network connection failures a {Ably::FALLBACK_HOSTS fallback host} is used to route around networking or intermittent problems if an Internet connection is available # @api private def determine_host raise ArgumentError, 'Block required' unless block_given? if can_use_fallback_hosts? internet_up? do |internet_is_up_result| @current_host = if internet_is_up_result client.fallback_endpoint.host else client.endpoint.host end yield current_host end else @current_host = client.endpoint.host yield current_host end end # @return [String] The current host that is configured following a call to method {#determine_host} # @api private attr_reader :current_host # @!attribute [r] port # @return [Integer] The default port used for this connection def port client.use_tls? ? 443 : 80 end # @!attribute [r] logger # @return [Logger] The {Ably::Logger} for this client. # Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize} def logger client.logger end # Add protocol message to the outgoing message queue and notify the dispatcher that a message is # ready to be sent # # @param [Ably::Models::ProtocolMessage] protocol_message # @return [void] # @api private def () () do Ably::Models::ProtocolMessage.new(, logger: logger).tap do || logger.debug("Connection: Prot msg queued =>: #{.action} #{}") end end end # @api private def () << end # @api private def () __outgoing_protocol_msgbus__.publish :protocol_message, end # @api private def create_websocket_transport raise ArgumentError, 'Block required' unless block_given? blocking_operation = proc do URI(client.endpoint).tap do |endpoint| url_params = client.auth.auth_params.merge( timestamp: as_since_epoch(Time.now), format: client.protocol, echo: client. ) if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug "Resuming connection key #{key} with serial #{serial}" elsif connection_recoverable? url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] logger.debug "Recovering connection with key #{client.recover}" once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end endpoint.query = URI.encode_www_form(url_params) end.to_s end callback = proc do |url| determine_host do |host| begin logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| yield websocket_transport if block_given? end rescue EventMachine::ConnectionError => error manager.connection_opening_failed error end end end # client.auth.auth_params is a blocking call, so defer this into a thread EventMachine.defer blocking_operation, callback end # @api private def release_websocket_transport @transport = nil end # @api private def set_failed_connection_error_reason(error) @error_reason = error end # @api private def clear_error_reason @error_reason = nil end # Executes registered callbacks for a successful connection resume event # @api private def resumed resume_callbacks.each(&:call) end # Provides a simple hook to inject a callback when a connection is successfully resumed # @api private def on_resume(&callback) resume_callbacks << callback end # Remove a registered connection resume callback # @api private def off_resume(&callback) resume_callbacks.delete(callback) end # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private # The client serial is incremented for every message that is published that requires an ACK. # Note that this is different to the connection serial that contains the last known serial number # received from the server. # # A client serial number therefore does not guarantee a message has been received, only sent. # A connection serial guarantees the server has received the message and is thus used for connection # recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent attr_reader :client_serial def resume_callbacks @resume_callbacks ||= [] end def Ably::Util::PubSub.new( coerce_into: Proc.new do |event| raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message :protocol_message end ) end def () if Ably::Models::ProtocolMessage.ack_required?([:action]) () { yield } else yield end end def () @client_serial += 1 [:msgSerial] = client_serial yield rescue StandardError => e @client_serial -= 1 raise e end # Simply wait until the next EventMachine tick to ensure Connection initialization is complete def when_initialized EventMachine.next_tick { yield } end def connection_resumable? !key.nil? && !serial.nil? end def connection_recoverable? connection_recover_parts end def connection_recover_parts client.recover.to_s.match(RECOVER_REGEX) end def can_use_fallback_hosts? if client.environment.nil? && client.custom_realtime_host.nil? if connecting? && previous_state use_fallback_if_disconnected? || use_fallback_if_suspended? end end end def use_fallback_if_disconnected? second_reconnect_attempt_for(:disconnected, 1) end def use_fallback_if_suspended? second_reconnect_attempt_for(:suspended, 2) # on first suspended state use default Ably host again end def second_reconnect_attempt_for(state, first_attempt_count) previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count end end |
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Underlying socket transport used for this connection, for internal use by the client library
85 86 87 |
# File 'lib/ably/realtime/connection.rb', line 85 def transport @transport end |
Instance Method Details
#add_message_to_outgoing_queue(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
305 306 307 |
# File 'lib/ably/realtime/connection.rb', line 305 def () << end |
#clear_error_reason ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
369 370 371 |
# File 'lib/ably/realtime/connection.rb', line 369 def clear_error_reason @error_reason = nil end |
#close {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable
Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to #connect.
125 126 127 128 129 130 131 |
# File 'lib/ably/realtime/connection.rb', line 125 def close(&success_block) unless closing? || closed? raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing) transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end |
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.
213 214 215 216 217 218 219 |
# File 'lib/ably/realtime/connection.rb', line 213 def configure_new(connection_id, connection_key, connection_serial) @id = connection_id @key = connection_key @client_serial = connection_serial update_connection_serial connection_serial end |
#connect {|Ably::Realtime::Connection| ... } ⇒ EventMachine::Deferrable
Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.
140 141 142 143 144 145 146 |
# File 'lib/ably/realtime/connection.rb', line 140 def connect(&success_block) unless connecting? || connected? raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) transition_state_machine :connecting end deferrable_for_state_change_to(STATE.Connected, &success_block) end |
#create_websocket_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/ably/realtime/connection.rb', line 315 def create_websocket_transport raise ArgumentError, 'Block required' unless block_given? blocking_operation = proc do URI(client.endpoint).tap do |endpoint| url_params = client.auth.auth_params.merge( timestamp: as_since_epoch(Time.now), format: client.protocol, echo: client. ) if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug "Resuming connection key #{key} with serial #{serial}" elsif connection_recoverable? url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] logger.debug "Recovering connection with key #{client.recover}" once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end endpoint.query = URI.encode_www_form(url_params) end.to_s end callback = proc do |url| determine_host do |host| begin logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| yield websocket_transport if block_given? end rescue EventMachine::ConnectionError => error manager.connection_opening_failed error end end end # client.auth.auth_params is a blocking call, so defer this into a thread EventMachine.defer blocking_operation, callback end |
#determine_host {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Determines the correct host name to use for the next connection attempt and updates current_host
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/ably/realtime/connection.rb', line 253 def determine_host raise ArgumentError, 'Block required' unless block_given? if can_use_fallback_hosts? internet_up? do |internet_is_up_result| @current_host = if internet_is_up_result client.fallback_endpoint.host else client.endpoint.host end yield current_host end else @current_host = client.endpoint.host yield current_host end end |
#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/ably/realtime/connection.rb', line 186 def internet_up? EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http| http.errback do yield false if block_given? deferrable.fail end http.callback do result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) yield result if block_given? deferrable.succeed end end end end |
#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
310 311 312 |
# File 'lib/ably/realtime/connection.rb', line 310 def () __outgoing_protocol_msgbus__.publish :protocol_message, end |
#off_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove a registered connection resume callback
387 388 389 |
# File 'lib/ably/realtime/connection.rb', line 387 def off_resume(&callback) resume_callbacks.delete(callback) end |
#on_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides a simple hook to inject a callback when a connection is successfully resumed
381 382 383 |
# File 'lib/ably/realtime/connection.rb', line 381 def on_resume(&callback) resume_callbacks << callback end |
#ping {|Integer| ... } ⇒ void
This method returns an undefined value.
Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the :connected state.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/ably/realtime/connection.rb', line 162 def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil wait_for_ping = Proc.new do || if .action == Ably::Models::ProtocolMessage::ACTION.Heartbeat __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do started = Time.now action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end end |
#release_websocket_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
359 360 361 |
# File 'lib/ably/realtime/connection.rb', line 359 def release_websocket_transport @transport = nil end |
#reset_resume_info ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Disable automatic resume of a connection
231 232 233 234 |
# File 'lib/ably/realtime/connection.rb', line 231 def reset_resume_info @key = nil @serial = nil end |
#resumed ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes registered callbacks for a successful connection resume event
375 376 377 |
# File 'lib/ably/realtime/connection.rb', line 375 def resumed resume_callbacks.each(&:call) end |
#send_protocol_message(protocol_message) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent
294 295 296 297 298 299 300 301 302 |
# File 'lib/ably/realtime/connection.rb', line 294 def () () do Ably::Models::ProtocolMessage.new(, logger: logger).tap do || logger.debug("Connection: Prot msg queued =>: #{.action} #{}") end end end |
#set_failed_connection_error_reason(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
364 365 366 |
# File 'lib/ably/realtime/connection.rb', line 364 def set_failed_connection_error_reason(error) @error_reason = error end |
#update_connection_serial(connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Store last received connection serial so that the connection can be resumed from the last known point-in-time
224 225 226 |
# File 'lib/ably/realtime/connection.rb', line 224 def update_connection_serial(connection_serial) @serial = connection_serial end |