Class: Ably::Realtime::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Modules::AsyncWrapper, Modules::Conversions, Ably::Realtime::Channel::Publisher
Defined in:
lib/ably/realtime/client.rb,
lib/ably/realtime/client/incoming_message_dispatcher.rb,
lib/ably/realtime/client/outgoing_message_dispatcher.rb

Overview

Client for the Ably Realtime API

Defined Under Namespace

Classes: IncomingMessageDispatcher, OutgoingMessageDispatcher

Constant Summary collapse

DOMAIN =
'realtime.ably.io'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Ably::Realtime::Client

Creates a Realtime Client and configures the Auth object for the connection.

Examples:

# create a new client authenticating with basic auth
client = Ably::Realtime::Client.new('key.id:secret')

# create a new client and configure a client ID used for presence
client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')

Parameters:

  • options (Hash, String)

    an options Hash used to configure the client and the authentication, or String with an API key or Token ID

Options Hash (options):

  • :auth_callback (Proc)

    when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required. Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread. The Proc should return a token string, Models::TokenDetails or JSON equivalent, Models::TokenRequest or JSON equivalent

  • :queue_messages (Boolean)

    If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states

  • :echo_messages (Boolean)

    If false, prevents messages originating from this connection being echoed back on the same connection

  • :recover (String)

    When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery

  • :auto_connect (Boolean)

    By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.

  • :channel_retry_timeout (Integer) — default: 15 seconds

    . When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED

  • :disconnected_retry_timeout (Integer) — default: 15 seconds

    . When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically

  • :suspended_retry_timeout (Integer) — default: 30 seconds

    . When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically

  • :disable_websocket_heartbeats (Boolean)

    WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes

  • :tls (Boolean) — default: true

    When false, TLS is disabled. Please note Basic Auth is disallowed without TLS as secrets cannot be transmitted over unsecured connections.

  • :key (String)

    API key comprising the key name and key secret in a single string

  • :token (String)

    Token string or Models::TokenDetails used to authenticate requests

  • :token_details (String)

    Models::TokenDetails used to authenticate requests

  • :use_token_auth (Boolean)

    Will force Basic Auth if set to false, and Token auth if set to true

  • :environment (String)

    Specify 'sandbox' when testing the client library against an alternate Ably environment

  • :protocol (Symbol) — default: :msgpack

    Protocol used to communicate with Ably, :json and :msgpack currently supported

  • :use_binary_protocol (Boolean) — default: true

    When true will use the MessagePack binary protocol, when false it will use JSON encoding. This option will overide :protocol option

  • :log_level (Logger::Severity, Symbol) — default: Logger::WARN

    Log level for the standard Logger that outputs to STDOUT. Can be set to :fatal (Logger::FATAL), :error (Logger::ERROR), :warn (Logger::WARN), :info (Logger::INFO), :debug (Logger::DEBUG) or :none

  • :logger (Logger)

    A custom logger can be used however it must adhere to the Ruby Logger interface, see www.ruby-doc.org/stdlib-1.9.3/libdoc/logger/rdoc/Logger.html

  • :client_id (String)

    client ID identifying this connection to other clients

  • :auth_url (String)

    a URL to be used to GET or POST a set of token request params, to obtain a signed token request

  • :auth_headers (Hash)

    a set of application-specific headers to be added to any request made to the auth_url

  • :auth_params (Hash)

    a set of application-specific query params to be added to any request made to the auth_url

  • :auth_method (Symbol) — default: :get

    HTTP method to use with auth_url, must be either :get or :post

  • :auth_callback (Proc)

    when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required. The Proc should return a token string, Models::TokenDetails or JSON equivalent, Models::TokenRequest or JSON equivalent

  • :query_time (Boolean)

    when true will query the Ably system for the current time instead of using the local time

  • :default_token_params (Hash)

    convenience to pass in token_params that will be used as a default for all token requests. See Auth#create_token_request

  • :http_open_timeout (Integer) — default: 4 seconds

    timeout in seconds for opening an HTTP connection for all HTTP requests

  • :http_request_timeout (Integer) — default: 10 seconds

    timeout in seconds for any single complete HTTP request and response

  • :http_max_retry_count (Integer) — default: 3

    maximum number of fallback host retries for HTTP requests that fail due to network issues or server problems

  • :http_max_retry_duration (Integer) — default: 15 seconds

    maximum elapsed time in which fallback host retries for HTTP requests will be attempted i.e. if the first default host attempt takes 5s, and then the subsequent fallback retry attempt takes 7s, no further fallback host attempts will be made as the total elapsed time of 12s exceeds the default 10s limit

  • :fallback_hosts_use_default (Boolean) — default: false

    When true, forces the user of fallback hosts even if a non-default production endpoint is being used

  • :fallback_hosts (Array<String>)

    When an array of fallback hosts are provided, these fallback hosts are always used if a request fails to the primary endpoint. If an empty array is provided, the fallback host functionality is disabled

  • :fallback_retry_timeout (Integer) — default: 600 seconds

    amount of time in seconds a REST client will continue to use a working fallback host when the primary fallback host has previously failed

  • :add_request_ids (Boolean) — default: false

    When true, adds a unique request_id to each request sent to Ably servers. This is handy when reporting issues, because you can refer to a specific request.

  • :idempotent_rest_publishing (Boolean) — default: false if ver < 1.2

    When true, idempotent publishing is enabled for all messages published via REST

Raises:

  • (ArgumentError)

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/ably/realtime/client.rb', line 100

def initialize(options)
  raise ArgumentError, 'Options Hash is expected' if options.nil?

  options = options.clone
  if options.kind_of?(String)
    options = if options.match(Ably::Auth::API_KEY_REGEX)
      { key: options }
    else
      { token: options }
    end
  end

  @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
  @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
  @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
  @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
  @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
  @recover               = rest_client.options[:recover]

  raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

  @auth       = Ably::Realtime::Auth.new(self)
  @channels   = Ably::Realtime::Channels.new(self)
  @connection = Ably::Realtime::Connection.new(self, options)
end

Instance Attribute Details

#authAbly::Auth (readonly)

Auth authentication object configured for this connection

Returns:


37
38
39
# File 'lib/ably/realtime/client.rb', line 37

def auth
  @auth
end

#auth_optionsHash (readonly)

Returns Auth options configured for this client

Returns:

  • (Hash)

    Auth options configured for this client


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#auto_connectBoolean (readonly)

When true, as soon as the client library is instantiated it will connect to Ably. If this attribute is false, a connection must be opened explicitly

Returns:

  • (Boolean)

62
63
64
# File 'lib/ably/realtime/client.rb', line 62

def auto_connect
  @auto_connect
end

#channelsAby::Realtime::Channels (readonly)

The collection of Ably::Realtime::Channels that have been created

Returns:

  • (Aby::Realtime::Channels)

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#client_idString (readonly)

Returns A client ID, used for identifying this client for presence purposes

Returns:

  • (String)

    A client ID, used for identifying this client for presence purposes


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#connectionAby::Realtime::Connection (readonly)

The underlying connection for this client

Returns:

  • (Aby::Realtime::Connection)

41
42
43
# File 'lib/ably/realtime/client.rb', line 41

def connection
  @connection
end

#custom_realtime_hostString, Nil (readonly)

The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the Ably::Realtime::Client was created

Returns:

  • (String, Nil)

58
59
60
# File 'lib/ably/realtime/client.rb', line 58

def custom_realtime_host
  @custom_realtime_host
end

#echo_messagesBoolean (readonly)

When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true

Returns:

  • (Boolean)

50
51
52
# File 'lib/ably/realtime/client.rb', line 50

def echo_messages
  @echo_messages
end

#encodersArray<Ably::Models::MessageEncoder::Base> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The registered encoders that are used to encode and decode message payloads

Returns:

  • (Array<Ably::Models::MessageEncoder::Base>)

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#endpointURI::Generic (readonly)

Returns Default Ably Realtime endpoint used for all requests

Returns:

  • (URI::Generic)

    Default Ably Realtime endpoint used for all requests


257
258
259
# File 'lib/ably/realtime/client.rb', line 257

def endpoint
  endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
end

#environmentString (readonly)

Custom environment to use such as 'sandbox' when testing the client library against an alternate Ably environment

Returns:

  • (String)

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#fallback_endpointURI::Generic (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random fallback host or provided fallback hosts are used

Returns:

  • (URI::Generic)

    Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random fallback host or provided fallback hosts are used


286
287
288
289
290
291
292
293
294
295
# File 'lib/ably/realtime/client.rb', line 286

def fallback_endpoint
  unless defined?(@fallback_endpoints) && @fallback_endpoints
    @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
    @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
  end

  fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

  @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
end

#protocolSymbol (readonly)

The protocol configured for this client, either binary `:msgpack` or text based `:json`

Returns:

  • (Symbol)

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#protocol_binary?Boolean (readonly)

Returns True of the transport #protocol communicates with Ably with a binary protocol

Returns:

  • (Boolean)

    True of the transport #protocol communicates with Ably with a binary protocol


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/ably/realtime/client.rb', line 23

class Client
  include Ably::Modules::AsyncWrapper
  include Ably::Realtime::Channel::Publisher
  include Ably::Modules::Conversions

  extend Forwardable

  DOMAIN = 'realtime.ably.io'

  # The collection of {Ably::Realtime::Channel}s that have been created
  # @return [Aby::Realtime::Channels]
  attr_reader :channels

  # (see Ably::Rest::Client#auth)
  attr_reader :auth

  # The underlying connection for this client
  # @return [Aby::Realtime::Connection]
  attr_reader :connection

  # The {Ably::Rest::Client REST client} instantiated with the same credentials and configuration that is used for all REST operations such as authentication
  # @return [Ably::Rest::Client]
  # @private
  attr_reader :rest_client

  # When false the client suppresses messages originating from this connection being echoed back on the same connection. Defaults to true
  # @return [Boolean]
  attr_reader :echo_messages

  # If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true
  # @return [Boolean]
  attr_reader :queue_messages

  # The custom realtime websocket host that is being used if it was provided with the option `:ws_host` when the {Client} was created
  # @return [String,Nil]
  attr_reader :custom_realtime_host

  # When true, as soon as the client library is instantiated it will connect to Ably.  If this attribute is false, a connection must be opened explicitly
  # @return [Boolean]
  attr_reader :auto_connect

  # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @return [String,Nil]
  attr_reader :recover

  def_delegators :auth, :client_id, :auth_options
  def_delegators :@rest_client, :encoders
  def_delegators :@rest_client, :use_tls?, :protocol, :protocol_binary?
  def_delegators :@rest_client, :environment, :custom_host, :custom_port, :custom_tls_port
  def_delegators :@rest_client, :log_level

  # Creates a {Ably::Realtime::Client Realtime Client} and configures the {Ably::Auth} object for the connection.
  #
  # @param (see Ably::Rest::Client#initialize)
  # @option options (see Ably::Rest::Client#initialize)
  # @option options [Proc]                    :auth_callback       when provided, the Proc will be called with the token params hash as the first argument, whenever a new token is required.
  #                                                                Whilst the proc is called synchronously, it does not block the EventMachine reactor as it is run in a separate thread.
  #                                                                The Proc should return a token string, {Ably::Models::TokenDetails} or JSON equivalent, {Ably::Models::TokenRequest} or JSON equivalent
  # @option options [Boolean] :queue_messages If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states
  # @option options [Boolean] :echo_messages  If false, prevents messages originating from this connection being echoed back on the same connection
  # @option options [String]  :recover        When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
  # @option options [Boolean] :auto_connect   By default as soon as the client library is instantiated it will connect to Ably. You can optionally set this to false and explicitly connect.
  #
  # @option options [Integer] :channel_retry_timeout       (15 seconds). When a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  # @option options [Integer] :disconnected_retry_timeout  (15 seconds). When the connection enters the DISCONNECTED state, after this delay in seconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  # @option options [Integer] :suspended_retry_timeout     (30 seconds). When the connection enters the SUSPENDED state, after this delay in seconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  # @option options [Boolean] :disable_websocket_heartbeats   WebSocket heartbeats are more efficient than protocol level heartbeats, however they can be disabled for development purposes
  #
  # @return [Ably::Realtime::Client]
  #
  # @example
  #    # create a new client authenticating with basic auth
  #    client = Ably::Realtime::Client.new('key.id:secret')
  #
  #    # create a new client and configure a client ID used for presence
  #    client = Ably::Realtime::Client.new(key: 'key.id:secret', client_id: 'john')
  #
  def initialize(options)
    raise ArgumentError, 'Options Hash is expected' if options.nil?

    options = options.clone
    if options.kind_of?(String)
      options = if options.match(Ably::Auth::API_KEY_REGEX)
        { key: options }
      else
        { token: options }
      end
    end

    @rest_client           = Ably::Rest::Client.new(options.merge(realtime_client: self))
    @echo_messages         = rest_client.options.fetch(:echo_messages, true) == false ? false : true
    @queue_messages        = rest_client.options.fetch(:queue_messages, true) == false ? false : true
    @custom_realtime_host  = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
    @auto_connect          = rest_client.options.fetch(:auto_connect, true) == false ? false : true
    @recover               = rest_client.options[:recover]

    raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)

    @auth       = Ably::Realtime::Auth.new(self)
    @channels   = Ably::Realtime::Channels.new(self)
    @connection = Ably::Realtime::Connection.new(self, options)
  end

  # Return a {Ably::Realtime::Channel Realtime Channel} for the given name
  #
  # @param (see Ably::Realtime::Channels#get)
  # @return (see Ably::Realtime::Channels#get)
  #
  def channel(name, channel_options = {})
    channels.get(name, channel_options)
  end

  # Retrieve the Ably service time
  #
  # @yield [Time] The time as reported by the Ably service
  # @return [Ably::Util::SafeDeferrable]
  #
  def time(&success_callback)
    async_wrap(success_callback) do
      rest_client.time
    end
  end

  # Retrieve the stats for the application
  #
  # @param (see Ably::Rest::Client#stats)
  # @option options (see Ably::Rest::Client#stats)
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Stats>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def stats(options = {}, &success_callback)
    async_wrap(success_callback) do
      rest_client.stats(options)
    end
  end

  # (see Ably::Realtime::Connection#close)
  def close(&block)
    connection.close(&block)
  end

  # (see Ably::Realtime::Connection#connect)
  def connect(&block)
    connection.connect(&block)
  end

  # Push notification object for publishing and managing push notifications
  # @return [Ably::Realtime::Push]
  def push
    @push ||= Push.new(self)
  end

  # (see Ably::Rest::Client#request)
  # @yield [Ably::Models::HttpPaginatedResponse<>] An Array of Stats
  #
  # @return [Ably::Util::SafeDeferrable]
  def request(method, path, params = {}, body = nil, headers = {}, &callback)
    async_wrap(callback) do
      rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
    end
  end

  # Publish one or more messages to the specified channel.
  #
  # This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
  # If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
  # is recommended. However, channel options such as encryption are not supported with this method.  If you need to specify channel options
  # we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
  # to published messages on that channel.
  #
  # Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
  #
  # @param channel [String]   The channel name you want to publish the message(s) to
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   client.publish 'activityChannel', click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   client.publish 'activityChannel', messages
  #
  #   client.publish('activityChannel', 'click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   client.publish('activityChannel', 'click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(channel_name, name, data = nil, attributes = {}, &success_block)
    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
      error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # @!attribute [r] endpoint
  # @return [URI::Generic] Default Ably Realtime endpoint used for all requests
  def endpoint
    endpoint_for_host(custom_realtime_host || [environment, DOMAIN].compact.join('-'))
  end

  # (see Ably::Rest::Client#register_encoder)
  def register_encoder(encoder)
    rest_client.register_encoder encoder
  end

  # (see Ably::Rest::Client#fallback_hosts)
  def fallback_hosts
    rest_client.fallback_hosts
  end

  # (see Ably::Rest::Client#logger)
  def logger
    @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
  end

  # Disable connection recovery, typically used after a connection has been recovered
  # @return [void]
  # @api private
  def disable_automatic_connection_recovery
    @recover = nil
  end

  # @!attribute [r] fallback_endpoint
  # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
  # @api private
  def fallback_endpoint
    unless defined?(@fallback_endpoints) && @fallback_endpoints
      @fallback_endpoints = fallback_hosts.shuffle.map { |fallback_host| endpoint_for_host(fallback_host) }
      @fallback_endpoints << endpoint # Try the original host last if all fallbacks have been used
    end

    fallback_endpoint_index = connection.manager.retry_count_for_state(:disconnected) + connection.manager.retry_count_for_state(:suspended) - 1

    @fallback_endpoints[fallback_endpoint_index % @fallback_endpoints.count]
  end

  # The local device detilas
  # @return [Ably::Models::LocalDevice]
  #
  # @note This is unsupported in the Ruby library
  def device
    raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
  end

  private
  def endpoint_for_host(host)
    port = if use_tls?
      custom_tls_port
    else
      custom_port
    end

    raise ArgumentError, "Custom port must be an Integer or nil" if port && !port.kind_of?(Integer)

    options = {
      scheme: use_tls? ? 'wss' : 'ws',
      host:   host
    }
    options.merge!(port: port) if port

    URI::Generic.build(options)
  end
end

#queue_messagesBoolean (readonly)

If false, this disables the default behaviour whereby the library queues messages on a connection in the disconnected or connecting states. Defaults to true

Returns:

  • (Boolean)

54
55
56
# File 'lib/ably/realtime/client.rb', line 54

def queue_messages
  @queue_messages
end

#recoverString, Nil (readonly)

When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery

Returns:

  • (String, Nil)

66
67
68
# File 'lib/ably/realtime/client.rb', line 66

def recover
  @recover
end

#rest_clientAbly::Rest::Client (readonly)

The REST client instantiated with the same credentials and configuration that is used for all REST operations such as authentication

Returns:


46
47
48
# File 'lib/ably/realtime/client.rb', line 46

def rest_client
  @rest_client
end

Instance Method Details

#channel(name, channel_options = {}) ⇒ Ably::Realtime::Channel

Return a Realtime Channel for the given name

Parameters:

  • name (String)

    The name of the channel

  • channel_options (Hash) (defaults to: {})

    Channel options, currently reserved for Encryption options

Returns:


131
132
133
# File 'lib/ably/realtime/client.rb', line 131

def channel(name, channel_options = {})
  channels.get(name, channel_options)
end

#close { ... } ⇒ EventMachine::Deferrable

Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to Ably::Realtime::Connection#connect.

Yields:

  • block is called as soon as this connection is in the Closed state

Returns:

  • (EventMachine::Deferrable)

162
163
164
# File 'lib/ably/realtime/client.rb', line 162

def close(&block)
  connection.close(&block)
end

#connect { ... } ⇒ EventMachine::Deferrable

Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. Succeeds when connection is established i.e. state is @[email protected] Fails when state becomes either @[email protected], @[email protected] or @[email protected]

Note that if the connection remains in the disconnected ans suspended states indefinitely, the Deferrable or block provided may never be called

Yields:

  • block is called as soon as this connection is in the Connected state

Returns:

  • (EventMachine::Deferrable)

167
168
169
# File 'lib/ably/realtime/client.rb', line 167

def connect(&block)
  connection.connect(&block)
end

#deviceAbly::Models::LocalDevice

Note:

This is unsupported in the Ruby library

The local device detilas

Returns:

  • (Ably::Models::LocalDevice)

Raises:


301
302
303
# File 'lib/ably/realtime/client.rb', line 301

def device
  raise Ably::Exceptions::PushNotificationsNotSupported, 'This device does not support receiving or subscribing to push notifications. The local device object is not unavailable'
end

#disable_automatic_connection_recoveryvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Disable connection recovery, typically used after a connection has been recovered


279
280
281
# File 'lib/ably/realtime/client.rb', line 279

def disable_automatic_connection_recovery
  @recover = nil
end

#fallback_hostsObject

The list of fallback hosts to be used by this client if empty or nil then fallback host functionality is disabled


267
268
269
# File 'lib/ably/realtime/client.rb', line 267

def fallback_hosts
  rest_client.fallback_hosts
end

#loggerLogger

Returns The Logger for this client. Configure the log_level with the `:log_level` option, refer to #initialize

Returns:

  • (Logger)

    The Logger for this client. Configure the log_level with the `:log_level` option, refer to #initialize


272
273
274
# File 'lib/ably/realtime/client.rb', line 272

def logger
  @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
end

#publish(channel_name, name, data = nil, attributes = {}) {|Ably::Models::Message, Array<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable

Publish one or more messages to the specified channel.

This method allows messages to be efficiently published to Ably without instancing a Ably::Realtime::Channel object. If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method is recommended. However, channel options such as encryption are not supported with this method. If you need to specify channel options we recommend you use the Ably::Realtime::Channel publish method without attaching to each channel, unless you also want to subscribe to published messages on that channel.

Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.

Examples:

# Publish a single message
client.publish 'activityChannel', click', { x: 1, y: 2 }

# Publish an array of message Hashes
messages = [
  { name: 'click', { x: 1, y: 2 } },
  { name: 'click', { x: 2, y: 3 } }
]
client.publish 'activityChannel', messages

# Publish an array of Ably::Models::Message objects
messages = [
  Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  Ably::Models::Message(name: 'click', { x: 2, y: 3 })
]
client.publish 'activityChannel', messages

client.publish('activityChannel', 'click', 'body') do |message|
  puts "#{message.name} event received with #{message.data}"
end

client.publish('activityChannel', 'click', 'body').errback do |error, message|
  puts "#{message.name} was not received, error #{error.message}"
end

Parameters:

  • channel (String)

    The channel name you want to publish the message(s) to

  • name (String, Array<Ably::Models::Message|Hash>, nil)

    The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name and :data pairs

  • data (String, ByteArray, nil) (defaults to: nil)

    The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument

  • attributes (Hash, nil) (defaults to: {})

    Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string

Yields:

Returns:


231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/ably/realtime/client.rb', line 231

def publish(channel_name, name, data = nil, attributes = {}, &success_block)
  if !connection.can_publish_messages?
    error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  messages = if name.kind_of?(Enumerable)
    name
  else
    name = ensure_utf_8(:name, name, allow_nil: true)
    ensure_supported_payload data
    [{ name: name, data: data }.merge(attributes)]
  end

  if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
    error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
    deferrable.callback(&success_block) if block_given?
  end
end

#pushAbly::Realtime::Push

Push notification object for publishing and managing push notifications


173
174
175
# File 'lib/ably/realtime/client.rb', line 173

def push
  @push ||= Push.new(self)
end

#register_encoder(encoder) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Encoders and decoders are processed in the order they are added so the first encoder will be given priority when encoding and decoding

This method returns an undefined value.

Register a message encoder and decoder that implements Ably::Models::MessageEncoders::Base interface. Message encoders are used to encode and decode message payloads automatically.

Parameters:


262
263
264
# File 'lib/ably/realtime/client.rb', line 262

def register_encoder(encoder)
  rest_client.register_encoder encoder
end

#request(method, path, params = {}, body = nil, headers = {}) {|Ably::Models::HttpPaginatedResponse<>| ... } ⇒ Ably::Models::HttpPaginatedResponse<>, Ably::Util::SafeDeferrable

Perform an HTTP request to the Ably API This is a convenience for customers who wish to use bleeding edge REST API functionality that is either not documented or is not included in the API for our client libraries. The REST client library provides a function to issue HTTP requests to the Ably endpoints with all the built in functionality of the library such as authentication, paging, fallback hosts, MsgPack and JSON support etc.

Parameters:

  • method (Symbol)

    The HTTP method symbol such as :get, :post, :put

  • path (String)

    The path of the URL such /channel/foo/publish

  • params (Hash, nil) (defaults to: {})

    Optional querystring params

  • body (Hash, nil) (defaults to: nil)

    Optional body for the POST or PUT request, must be nil or a JSON-like object

  • headers (Hash, nil) (defaults to: {})

    Optional additional headers

Yields:

Returns:


181
182
183
184
185
# File 'lib/ably/realtime/client.rb', line 181

def request(method, path, params = {}, body = nil, headers = {}, &callback)
  async_wrap(callback) do
    rest_client.request(method, path, params, body, headers, async_blocking_operations: true)
  end
end

#stats(options = {}) {|Ably::Models::PaginatedResult<Ably::Models::Stats>| ... } ⇒ Ably::Util::SafeDeferrable

Retrieve the stats for the application

Parameters:

  • options (Hash) (defaults to: {})

    the options for the stats request

Options Hash (options):

  • :start (Integer, Time)

    Ensure earliest time or millisecond since epoch for any stats retrieved is :start

  • :end (Integer, Time)

    Ensure latest time or millisecond since epoch for any stats retrieved is :end

  • :direction (Symbol)

    :forwards or :backwards, defaults to :backwards

  • :limit (Integer)

    Maximum number of messages to retrieve up to 1,000, defaults to 100

  • :unit (Symbol)

    `:minute`, `:hour`, `:day` or `:month`. Defaults to `:minute`

Yields:

Returns:


155
156
157
158
159
# File 'lib/ably/realtime/client.rb', line 155

def stats(options = {}, &success_callback)
  async_wrap(success_callback) do
    rest_client.stats(options)
  end
end

#time {|Time| ... } ⇒ Ably::Util::SafeDeferrable

Retrieve the Ably service time

Yields:

  • (Time)

    The time as reported by the Ably service

Returns:


140
141
142
143
144
# File 'lib/ably/realtime/client.rb', line 140

def time(&success_callback)
  async_wrap(success_callback) do
    rest_client.time
  end
end