Class: Lightstreamer::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/subscription.rb

Overview

This class manages a subscription that can be used to stream data from a Session. Subscriptions should always be created using Lightstreamer::Session#build_subscription. Subscriptions start receiving data after #start is called, and streaming subscription data can be consumed by registering an asynchronous data callback using #on_data, or by polling using #item_data.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, options) ⇒ Subscription

Initializes a new Lightstreamer subscription with the specified options.

Parameters:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/lightstreamer/subscription.rb', line 53

def initialize(session, options)
  @mutex = Mutex.new

  @session = WeakRef.new session
  @items = options.fetch(:items)
  @fields = options.fetch(:fields)
  @mode = options.fetch(:mode).to_sym
  @data_adapter = options[:data_adapter]
  @selector = options[:selector]
  @maximum_update_frequency = sanitize_frequency options[:maximum_update_frequency]

  clear_data
  clear_callbacks
end

Instance Attribute Details

#activeBoolean (readonly)

Whether this subscription is currently started and actively streaming data. See #start and #stop for details.

Returns:

  • (Boolean)


45
46
47
# File 'lib/lightstreamer/subscription.rb', line 45

def active
  @active
end

#data_adapterString? (readonly)

The name of the data adapter from the Lightstreamer session’s adapter set to use, or ‘nil` to use the default data adapter.

Returns:

  • (String, nil)


27
28
29
# File 'lib/lightstreamer/subscription.rb', line 27

def data_adapter
  @data_adapter
end

#fieldsArray (readonly)

The names of the fields to subscribe to on the items.

Returns:

  • (Array)


15
16
17
# File 'lib/lightstreamer/subscription.rb', line 15

def fields
  @fields
end

#itemsArray (readonly)

The names of the items to subscribe to.

Returns:

  • (Array)


10
11
12
# File 'lib/lightstreamer/subscription.rb', line 10

def items
  @items
end

#maximum_update_frequencyFloat, :unfiltered

The maximum number of updates this subscription should receive per second. If this is set to zero, which is the default, then there is no limit on the update frequency. If set to ‘:unfiltered` then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow). If #mode is `:raw` then the maximum update frequency is treated as `:unfiltered` regardless of its actual value.

Returns:

  • (Float, :unfiltered)


40
41
42
# File 'lib/lightstreamer/subscription.rb', line 40

def maximum_update_frequency
  @maximum_update_frequency
end

#mode:command, ... (readonly)

The operation mode of this subscription. The four supported operation modes are: ‘:command`, `:distinct`, `:merge` and `:raw`. See the Lightstreamer documentation for details on the different modes.

Returns:

  • (:command, :distinct, :merge, :raw)


21
22
23
# File 'lib/lightstreamer/subscription.rb', line 21

def mode
  @mode
end

#selectorString? (readonly)

The selector for table items.

Returns:

  • (String, nil)


32
33
34
# File 'lib/lightstreamer/subscription.rb', line 32

def selector
  @selector
end

Class Method Details

.next_idObject

Returns the next unique numeric subscription ID.



242
243
244
245
# File 'lib/lightstreamer/subscription.rb', line 242

def next_id
  @next_id ||= 0
  @next_id += 1
end

Instance Method Details

#after_control_request(action) ⇒ Object

Performs any required updates to this subscription’s state after a control request succeeds.



231
232
233
234
# File 'lib/lightstreamer/subscription.rb', line 231

def after_control_request(action)
  @active = true if action == :start
  @active = false if action == :stop
end

#clear_callbacksObject

Removes all #on_data, #on_overflow and #on_end_of_snapshot callbacks present on this subscription.



196
197
198
# File 'lib/lightstreamer/subscription.rb', line 196

def clear_callbacks
  @mutex.synchronize { @callbacks = { on_data: [], on_overflow: [], on_end_of_snapshot: [] } }
end

#clear_dataObject

Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.



129
130
131
# File 'lib/lightstreamer/subscription.rb', line 129

def clear_data
  @mutex.synchronize { @data = (0...items.size).map { SubscriptionItemData.new } }
end

#control_request_options(action, options = nil) ⇒ Object

Returns the control request arguments to use to perform the specified action on this subscription.



217
218
219
220
221
222
223
224
225
226
# File 'lib/lightstreamer/subscription.rb', line 217

def control_request_options(action, options = nil)
  case action.to_sym
  when :start
    start_control_request_options options
  when :unsilence
    { LS_session: @session.session_id, LS_op: :start, LS_table: id }
  when :stop
    { LS_session: @session.session_id, LS_op: :delete, LS_table: id }
  end
end

#idFixnum

Returns this subscription’s unique identification number.

Returns:

  • (Fixnum)


73
74
75
# File 'lib/lightstreamer/subscription.rb', line 73

def id
  @id ||= self.class.next_id
end

#item_data(item_name) ⇒ Hash, ...

Returns a copy of the current data of one of this subscription’s items. If #mode is ‘:merge` then the returned object will be a hash of the item’s state, if it is ‘:command` then an array of row data for the item will be returned, and if it is `:distinct` or `:raw` then just the most recent update received for the item will be returned. The return value will be `nil` if no data for the item has been set or been received.

Parameters:

  • item_name (String)

    The name of the item to return the current data for.

Returns:

  • (Hash, Array, nil)

    A copy of the item data.

Raises:

  • (ArgumentError)


141
142
143
144
145
146
# File 'lib/lightstreamer/subscription.rb', line 141

def item_data(item_name)
  index = @items.index item_name
  raise ArgumentError, 'Unknown item' unless index

  @mutex.synchronize { @data[index].data&.dup }
end

#on_data(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_data|`. The `item_data` argument will be an array if #mode is `:command`, for all other modes it will be a hash. Note that if #mode is `:distinct` or `:raw` then `item_data` and `new_data` will be the same.

Parameters:

  • callback (Proc)

    The callback that is to be run when new data arrives.



169
170
171
# File 'lib/lightstreamer/subscription.rb', line 169

def on_data(&callback)
  @mutex.synchronize { @callbacks[:on_data] << callback }
end

#on_end_of_snapshot(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription. End-of-snapshot notifications are only sent when #mode is ‘:command` or `:distinct` and `snapshot: true` was passed to #start. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are `|subscription, item_name|`.

Parameters:

  • callback (Proc)

    The callback that is to be run when an overflow is reported for this subscription.



191
192
193
# File 'lib/lightstreamer/subscription.rb', line 191

def on_end_of_snapshot(&callback)
  @mutex.synchronize { @callbacks[:on_end_of_snapshot] << callback }
end

#on_overflow(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription. This is only relevant when this subscription’s #mode is ‘:command` or `:raw`, or if #maximum_update_frequency is `:unfiltered`. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are `|subscription, item_name, overflow_size|`.

Parameters:

  • callback (Proc)

    The callback that is to be run when an overflow is reported for this subscription.



180
181
182
# File 'lib/lightstreamer/subscription.rb', line 180

def on_overflow(&callback)
  @mutex.synchronize { @callbacks[:on_overflow] << callback }
end

#process_stream_data(line) ⇒ Boolean

Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.

Parameters:

  • line (String)

    The line of stream data to process.

Returns:

  • (Boolean)

    Whether the passed line of stream data was processed by this subscription.



208
209
210
211
212
# File 'lib/lightstreamer/subscription.rb', line 208

def process_stream_data(line)
  return true if process_update_message UpdateMessage.parse(line, id, items, fields)
  return true if process_overflow_message OverflowMessage.parse(line, id, items)
  return true if process_end_of_snapshot_message EndOfSnapshotMessage.parse(line, id, items)
end

#set_item_data(item_name, item_data) ⇒ Object

Sets the current data for the item with the specified name. This is only allowed when #mode is ‘:command` or `:merge`. Raises an exception if the specified item name or item data is invalid.

Parameters:

  • item_name (String)

    The name of the item to set the data for.

  • item_data (Hash, Array<Hash>)

    The new data for the item. If #mode is ‘:merge` this must be a hash. If #mode is `:command` then this must be an `Array<Hash>` and each hash entry must have a unique `:key` value.

Raises:

  • (ArgumentError)


155
156
157
158
159
160
# File 'lib/lightstreamer/subscription.rb', line 155

def set_item_data(item_name, item_data)
  index = @items.index item_name
  raise ArgumentError, 'Unknown item' unless index

  @mutex.synchronize { @data[index].set_data item_data, mode }
end

#start(options = {}) ⇒ Object

Starts streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.

Parameters:

  • options (Hash) (defaults to: {})

    The options to start the subscription with.

Options Hash (options):

  • :silent (Boolean)

    Whether the subscription should be started in silent mode. In silent mode the subscription is initiated on the server and begins buffering incoming data, however this data will not be sent to the client for processing until #unsilence is called.

  • :snapshot (Boolean, Fixnum)

    Controls whether the server should send a snapshot of this subscription’s items. The default value is ‘false` which means then the server will not send snapshot information. If set to `true` then the server will send snapshot information if it is available. If this subscription’s #mode is ‘:distinct` then `:snapshot` can also be an integer specifying the number of events the server should send as part of the snapshot. If this latter option is used, or #mode is `:command`, then any callbacks registered with #on_end_of_snapshot will be called once the snapshot for each item is complete. This option is ignored when #mode is `:raw`.



92
93
94
95
96
97
# File 'lib/lightstreamer/subscription.rb', line 92

def start(options = {})
  return if @active

  @session.control_request control_request_options(:start, options)
  after_control_request :start
end

#stopObject

Stops streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.



109
110
111
112
# File 'lib/lightstreamer/subscription.rb', line 109

def stop
  @session.control_request control_request_options(:stop) if @active
  after_control_request :stop
end

#unsilenceObject

Unsilences this subscription if it was initially started in silent mode by passing ‘silent: true` to #start. If this subscription was not started in silent mode then this method has no effect. If an error occurs then a LightstreamerError subclass will be raised.



102
103
104
105
# File 'lib/lightstreamer/subscription.rb', line 102

def unsilence
  @session.control_request control_request_options(:unsilence)
  after_control_request :unsilence
end