Class: Ably::Realtime::Channel

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum
Includes:
Modules::AsyncWrapper, Modules::Conversions, Modules::EventEmitter, Modules::EventMachineHelpers, Modules::MessageEmitter, Modules::StateEmitter, Modules::UsesStateMachine
Defined in:
lib/ably/realtime/channel.rb,
lib/ably/realtime/channel/channel_manager.rb,
lib/ably/realtime/channel/channel_state_machine.rb

Overview

The Channel class represents a Channel belonging to this application. The Channel instance allows messages to be published and received, and controls the lifecycle of this instance’s attachment to the channel.

Channels will always be in one of the following states:

initialized: 0
attaching:   1
attached:    2
detaching:   3
detached:    4
failed:      5

Note that the states are available as Enum-like constants:

Channel::STATE.Initialized
Channel::STATE.Attaching
Channel::STATE.Attached
Channel::STATE.Detaching
Channel::STATE.Detached
Channel::STATE.Failed

Defined Under Namespace

Classes: ChannelManager, ChannelStateMachine

Constant Summary collapse

STATE =

ChannelState The permited states for this channel

ruby_enum('STATE',
  :initialized,
  :attaching,
  :attached,
  :detaching,
  :detached,
  :suspended,
  :failed
)
EVENT =

ChannelEvent The permitted channel events that are emitted for this channel

ruby_enum('EVENT',
  STATE.to_sym_arr + [:update]
)
MAX_PROTOCOL_MESSAGE_BATCH_SIZE =

Max number of messages to bundle in a single ProtocolMessage

50

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::MessageEmitter

#emit_message

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_off, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client, name, channel_options = {}) ⇒ Channel

Initialize a new Channel object

Parameters:

  • client (Ably::Rest::Client)
  • name (String)

    The name of the channel

  • channel_options (Hash) (defaults to: {})

    Channel options, currently reserved for Encryption options

Options Hash (channel_options):



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/ably/realtime/channel.rb', line 95

def initialize(client, name, channel_options = {})
  name = ensure_utf_8(:name, name)

  update_options channel_options
  @client        = client
  @name          = name
  @queue         = []

  @state_machine = ChannelStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ChannelManager.new(self, client.connection)

  setup_event_handlers
  setup_presence
end

Instance Attribute Details

#__incoming_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal channel incoming message bus.

Returns:



292
293
294
295
296
# File 'lib/ably/realtime/channel.rb', line 292

def __incoming_msgbus__
  @__incoming_msgbus__ ||= Ably::Util::PubSub.new(
    coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) }
  )
end

#attached_serialInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Serial number assigned to this channel when it was attached

Returns:

  • (Integer)


86
87
88
# File 'lib/ably/realtime/channel.rb', line 86

def attached_serial
  @attached_serial
end

#clientAbly::Realtime::Client (readonly)

Ably::Realtime::Client associated with this channel



64
65
66
# File 'lib/ably/realtime/channel.rb', line 64

def client
  @client
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

When a channel failure occurs this attribute contains the Ably Exception



76
77
78
# File 'lib/ably/realtime/channel.rb', line 76

def error_reason
  @error_reason
end

#managerAbly::Realtime::Channel::ChannelManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Channel manager responsible for attaching, detaching and handling failures for this channel



81
82
83
# File 'lib/ably/realtime/channel.rb', line 81

def manager
  @manager
end

#nameString (readonly)

Channel name

Returns:

  • (String)


68
69
70
# File 'lib/ably/realtime/channel.rb', line 68

def name
  @name
end

#optionsHash (readonly)

Channel options configured for this channel, see #initialize for channel_options

Returns:

  • (Hash)


72
73
74
# File 'lib/ably/realtime/channel.rb', line 72

def options
  @options
end

#stateAbly::Realtime::Connection::STATE (readonly)

Returns channel state.

Returns:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/ably/realtime/channel.rb', line 29

class Channel
  include Ably::Modules::Conversions
  include Ably::Modules::EventEmitter
  include Ably::Modules::EventMachineHelpers
  include Ably::Modules::AsyncWrapper
  include Ably::Modules::MessageEmitter
  extend Ably::Modules::Enum

  # ChannelState
  # The permited states for this channel
  STATE = ruby_enum('STATE',
    :initialized,
    :attaching,
    :attached,
    :detaching,
    :detached,
    :suspended,
    :failed
  )

  # ChannelEvent
  # The permitted channel events that are emitted for this channel
  EVENT = ruby_enum('EVENT',
    STATE.to_sym_arr + [:update]
  )

  include Ably::Modules::StateEmitter
  include Ably::Modules::UsesStateMachine
  ensure_state_machine_emits 'Ably::Models::ChannelStateChange'

  # Max number of messages to bundle in a single ProtocolMessage
  MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50

  # {Ably::Realtime::Client} associated with this channel
  # @return [Ably::Realtime::Client]
  attr_reader :client

  # Channel name
  # @return [String]
  attr_reader :name

  # Channel options configured for this channel, see {#initialize} for channel_options
  # @return [Hash]
  attr_reader :options

  # When a channel failure occurs this attribute contains the Ably Exception
  # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
  attr_reader :error_reason

  # The Channel manager responsible for attaching, detaching and handling failures for this channel
  # @return [Ably::Realtime::Channel::ChannelManager]
  # @api private
  attr_reader :manager

  # Serial number assigned to this channel when it was attached
  # @return [Integer]
  # @api private
  attr_reader :attached_serial

  # Initialize a new Channel object
  #
  # @param  client [Ably::Rest::Client]
  # @param  name [String] The name of the channel
  # @param  channel_options [Hash]     Channel options, currently reserved for Encryption options
  # @option channel_options [Hash,Ably::Models::CipherParams]   :cipher   A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional.  See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
  #
  def initialize(client, name, channel_options = {})
    name = ensure_utf_8(:name, name)

    update_options channel_options
    @client        = client
    @name          = name
    @queue         = []

    @state_machine = ChannelStateMachine.new(self)
    @state         = STATE(state_machine.current_state)
    @manager       = ChannelManager.new(self, client.connection)

    setup_event_handlers
    setup_presence
  end

  # Publish one or more messages to the channel.
  #
  # When publishing a message, if the channel is not attached, the channel is implicitly attached
  #
  # @param name [String, Array<Ably::Models::Message|Hash>, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
  # @param data [String, ByteArray, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
  # @param attributes [Hash, nil]   Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
  #
  # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
  #
  # @example
  #   # Publish a single message
  #   channel.publish 'click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', { x: 1, y: 2 } },
  #     { name: 'click', { x: 2, y: 3 } }
  #   ]
  #   channel.publish messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   channel.publish messages
  #
  #   channel.publish('click', 'body') do |message|
  #     puts "#{message.name} event received with #{message.data}"
  #   end
  #
  #   channel.publish('click', 'body').errback do |error, message|
  #     puts "#{message.name} was not received, error #{error.message}"
  #   end
  #
  def publish(name, data = nil, attributes = {}, &success_block)
    if detached? || detaching? || failed?
      error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    if !connection.can_publish_messages?
      error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    messages = if name.kind_of?(Enumerable)
      name
    else
      name = ensure_utf_8(:name, name, allow_nil: true)
      ensure_supported_payload data
      [{ name: name, data: data }.merge(attributes)]
    end

    queue_messages(messages).tap do |deferrable|
      deferrable.callback(&success_block) if block_given?
    end
  end

  # Subscribe to messages matching providing event name, or all messages if event name not provided.
  #
  # When subscribing to messages, if the channel is not attached, the channel is implicitly attached
  #
  # @param names [String] The event name of the message to subscribe to if provided.  Defaults to all events.
  # @yield [Ably::Models::Message] For each message received, the block is called
  #
  # @return [void]
  #
  def subscribe(*names, &callback)
    attach unless attached? || attaching?
    super
  end

  # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided.
  # If a block is not provided, all subscriptions will be unsubscribed
  #
  # @param names [String] The event name of the message to subscribe to if provided.  Defaults to all events.
  #
  # @return [void]
  #
  def unsubscribe(*names, &callback)
    super
  end

  # Attach to this channel, and call the block if provided when attached.
  # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon
  # to need to call attach explicitly.
  #
  # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
  #
  def attach(&success_block)
    if connection.closing? || connection.closed? || connection.suspended? || connection.failed?
      error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}")
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    if !attached?
      if detaching?
        # Let the pending operation complete (#RTL4h)
        once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) }
      else
        transition_state_machine :attaching if can_transition_to?(:attaching)
      end
    end

    deferrable_for_state_change_to(STATE.Attached, &success_block)
  end

  # Detach this channel, and call the block if provided when in a Detached or Failed state
  #
  # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state
  # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
  #
  def detach(&success_block)
    if initialized?
      success_block.call if block_given?
      return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger)
    end

    if failed? || connection.closing? || connection.failed?
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching))
    end

    if !detached?
      if attaching?
        # Let the pending operation complete (#RTL5i)
        once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) }
      elsif can_transition_to?(:detaching)
        transition_state_machine :detaching
      else
        transition_state_machine! :detached
      end
    end

    deferrable_for_state_change_to(STATE.Detached, &success_block)
  end

  # Presence object for this Channel.  This controls this client's
  # presence on the channel and may also be used to obtain presence information
  # and change events for other members of the channel.
  #
  # @return {Ably::Realtime::Presence}
  #
  def presence
    @presence
  end

  # Return the message history of the channel
  #
  # If the channel is attached, you can retrieve messages published on the channel before the
  # channel was attached with the option <tt>until_attach: true</tt>.  This is useful when a developer
  # wishes to display historical messages with the guarantee that no messages have been missed since attach.
  #
  # @param (see Ably::Rest::Channel#history)
  # @option options (see Ably::Rest::Channel#history)
  # @option options [Boolean]  :until_attach  When true, the history request will be limited only to messages published before this channel was attached. Channel must be attached
  #
  # @yield [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}.
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def history(options = {}, &callback)
    if options.delete(:until_attach)
      unless attached?
        error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' )
        return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
      end
      options[:from_serial] = attached_serial
    end

    async_wrap(callback) do
      rest_channel.history(options.merge(async_blocking_operations: true))
    end
  end

  # @!attribute [r] __incoming_msgbus__
  # @return [Ably::Util::PubSub] Client library internal channel incoming message bus
  # @api private
  def __incoming_msgbus__
    @__incoming_msgbus__ ||= Ably::Util::PubSub.new(
      coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) }
    )
  end

  # @api private
  def set_channel_error_reason(error)
    @error_reason = error
  end

  # @api private
  def clear_error_reason
    @error_reason = nil
  end

  # @api private
  def set_attached_serial(serial)
    @attached_serial = serial
  end

  # @api private
  def update_options(channel_options)
    @options = channel_options.clone.freeze
  end

  # Used by {Ably::Modules::StateEmitter} to debug state changes
  # @api private
  def logger
    client.logger
  end

  # Internal queue used for messages published that cannot yet be enqueued on the connection
  # @api private
  def __queue__
    @queue
  end

  # As we are using a state machine, do not allow change_state to be used
  # #transition_state_machine must be used instead
  private :change_state

  private
  def setup_event_handlers
    __incoming_msgbus__.subscribe(:message) do |message|
      message.decode(client.encoders, options) do |encode_error, error_message|
        client.logger.error error_message
      end
      emit_message message.name, message
    end

    unsafe_on(STATE.Attached) do
      process_queue
    end
  end

  # Queue messages and process queue if channel is attached.
  # If channel is not yet attached, attempt to attach it before the message queue is processed.
  # @return [Ably::Util::SafeDeferrable]
  def queue_messages(raw_messages)
    messages = Array(raw_messages).map do |raw_msg|
      create_message(raw_msg).tap do |message|
        next if message.client_id.nil?
        if message.client_id == '*'
          raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
        end
        if message.client_id && !message.client_id.kind_of?(String)
          raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages')
        end
        unless client.auth.can_assume_client_id?(message.client_id)
          raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
        end
      end
    end

    __queue__.push(*messages)

    if attached?
      process_queue
    else
      attach
    end

    if messages.count == 1
      # A message is a Deferrable so, if publishing only one message, simply return that Deferrable
      messages.first
    else
      deferrable_for_multiple_messages(messages)
    end
  end

  # A deferrable object that calls the success callback once all messages are delivered
  # If any message fails, the errback is called immediately
  # Only one callback or errback is ever called i.e. if a group of messages all fail, only once
  # errback will be invoked
  def deferrable_for_multiple_messages(messages)
    expected_deliveries = messages.count
    actual_deliveries = 0
    failed = false

    Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
      messages.each do |message|
        message.callback do
          next if failed
          actual_deliveries += 1
          deferrable.succeed messages if actual_deliveries == expected_deliveries
        end
        message.errback do |error|
          next if failed
          failed = true
          deferrable.fail error, message
        end
      end
    end
  end

  def messages_in_queue?
    !__queue__.empty?
  end

  # Move messages from Channel Queue into Outgoing Connection Queue
  def process_queue
    condition = -> { attached? && messages_in_queue? }
    non_blocking_loop_while(condition) do
      send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
    end
  end

  def send_messages_within_protocol_message(messages)
    connection.send_protocol_message(
      action:   Ably::Models::ProtocolMessage::ACTION.Message.to_i,
      channel:  name,
      messages: messages
    )
  end

  def create_message(message)
    Ably::Models::Message(message.dup).tap do |msg|
      msg.encode(client.encoders, options) do |encode_error, error_message|
        client.logger.error error_message
      end
    end
  end

  def rest_channel
    client.rest_client.channel(name)
  end

  def connection
    client.connection
  end

  def setup_presence
    @presence ||= Presence.new(self)
  end
end

Instance Method Details

#__queue__Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal queue used for messages published that cannot yet be enqueued on the connection



326
327
328
# File 'lib/ably/realtime/channel.rb', line 326

def __queue__
  @queue
end

#attach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable

Attach to this channel, and call the block if provided when attached. Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon to need to call attach explicitly.

Yields:

Returns:



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/ably/realtime/channel.rb', line 204

def attach(&success_block)
  if connection.closing? || connection.closed? || connection.suspended? || connection.failed?
    error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  if !attached?
    if detaching?
      # Let the pending operation complete (#RTL4h)
      once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) }
    else
      transition_state_machine :attaching if can_transition_to?(:attaching)
    end
  end

  deferrable_for_state_change_to(STATE.Attached, &success_block)
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



304
305
306
# File 'lib/ably/realtime/channel.rb', line 304

def clear_error_reason
  @error_reason = nil
end

#detach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable

Detach this channel, and call the block if provided when in a Detached or Failed state

Yields:

Returns:



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/ably/realtime/channel.rb', line 227

def detach(&success_block)
  if initialized?
    success_block.call if block_given?
    return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger)
  end

  if failed? || connection.closing? || connection.failed?
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching))
  end

  if !detached?
    if attaching?
      # Let the pending operation complete (#RTL5i)
      once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) }
    elsif can_transition_to?(:detaching)
      transition_state_machine :detaching
    else
      transition_state_machine! :detached
    end
  end

  deferrable_for_state_change_to(STATE.Detached, &success_block)
end

#history(options = {}) {|Ably::Models::PaginatedResult<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable

Return the message history of the channel

If the channel is attached, you can retrieve messages published on the channel before the channel was attached with the option until_attach: true. This is useful when a developer wishes to display historical messages with the guarantee that no messages have been missed since attach.

Parameters:

  • options (Hash) (defaults to: {})

    the options for the message history request

Options Hash (options):

  • :until_attach (Boolean)

    When true, the history request will be limited only to messages published before this channel was attached. Channel must be attached

  • :start (Integer, Time)

    Ensure earliest time or millisecond since epoch for any messages retrieved is :start

  • :end (Integer, Time)

    Ensure latest time or millisecond since epoch for any messages retrieved is :end

  • :direction (Symbol)

    :forwards or :backwards, defaults to :backwards

  • :limit (Integer)

    Maximum number of messages to retrieve up to 1,000, defaults to 100

Yields:

Returns:



275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/ably/realtime/channel.rb', line 275

def history(options = {}, &callback)
  if options.delete(:until_attach)
    unless attached?
      error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' )
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end
    options[:from_serial] = attached_serial
  end

  async_wrap(callback) do
    rest_channel.history(options.merge(async_blocking_operations: true))
  end
end

#loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Used by Modules::StateEmitter to debug state changes



320
321
322
# File 'lib/ably/realtime/channel.rb', line 320

def logger
  client.logger
end

#presenceAbly::Realtime::Presence

Presence object for this Channel. This controls this client’s presence on the channel and may also be used to obtain presence information and change events for other members of the channel.



257
258
259
# File 'lib/ably/realtime/channel.rb', line 257

def presence
  @presence
end

#publish(name, data = nil, attributes = {}) {|Ably::Models::Message, Array<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable

Publish one or more messages to the channel.

When publishing a message, if the channel is not attached, the channel is implicitly attached

Examples:

# Publish a single message
channel.publish 'click', { x: 1, y: 2 }

# Publish an array of message Hashes
messages = [
  { name: 'click', { x: 1, y: 2 } },
  { name: 'click', { x: 2, y: 3 } }
]
channel.publish messages

# Publish an array of Ably::Models::Message objects
messages = [
  Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  Ably::Models::Message(name: 'click', { x: 2, y: 3 })
]
channel.publish messages

channel.publish('click', 'body') do |message|
  puts "#{message.name} event received with #{message.data}"
end

channel.publish('click', 'body').errback do |error, message|
  puts "#{message.name} was not received, error #{error.message}"
end

Parameters:

  • name (String, Array<Ably::Models::Message|Hash>, nil)

    The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name and :data pairs

  • data (String, ByteArray, nil) (defaults to: nil)

    The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument

  • attributes (Hash, nil) (defaults to: {})

    Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string

Yields:

Returns:



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/ably/realtime/channel.rb', line 148

def publish(name, data = nil, attributes = {}, &success_block)
  if detached? || detaching? || failed?
    error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  if !connection.can_publish_messages?
    error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  messages = if name.kind_of?(Enumerable)
    name
  else
    name = ensure_utf_8(:name, name, allow_nil: true)
    ensure_supported_payload data
    [{ name: name, data: data }.merge(attributes)]
  end

  queue_messages(messages).tap do |deferrable|
    deferrable.callback(&success_block) if block_given?
  end
end

#set_attached_serial(serial) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



309
310
311
# File 'lib/ably/realtime/channel.rb', line 309

def set_attached_serial(serial)
  @attached_serial = serial
end

#set_channel_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



299
300
301
# File 'lib/ably/realtime/channel.rb', line 299

def set_channel_error_reason(error)
  @error_reason = error
end

#subscribe(*names) {|Ably::Models::Message| ... } ⇒ void

This method returns an undefined value.

Subscribe to messages matching providing event name, or all messages if event name not provided.

When subscribing to messages, if the channel is not attached, the channel is implicitly attached

Parameters:

  • names (String)

    The event name of the message to subscribe to if provided. Defaults to all events.

Yields:



181
182
183
184
# File 'lib/ably/realtime/channel.rb', line 181

def subscribe(*names, &callback)
  attach unless attached? || attaching?
  super
end

#unsubscribe(*names, &callback) ⇒ void

This method returns an undefined value.

Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. If a block is not provided, all subscriptions will be unsubscribed

Parameters:

  • names (String)

    The event name of the message to subscribe to if provided. Defaults to all events.



193
194
195
# File 'lib/ably/realtime/channel.rb', line 193

def unsubscribe(*names, &callback)
  super
end

#update_options(channel_options) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



314
315
316
# File 'lib/ably/realtime/channel.rb', line 314

def update_options(channel_options)
  @options = channel_options.clone.freeze
end