Class: Lightstreamer::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/subscription.rb

Overview

This class manages a subscription that can be used to stream data from a Session. Subscriptions should always be created using Lightstreamer::Session#build_subscription. Subscriptions start receiving data after #start is called, and streaming subscription data can be consumed by registering an asynchronous data callback using #on_data, or by polling using #item_data.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, options) ⇒ Subscription

Initializes a new Lightstreamer subscription with the specified options.

Parameters:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/lightstreamer/subscription.rb', line 56

def initialize(session, options)
  @session = session

  @items = options.fetch(:items)
  @fields = options.fetch(:fields)
  @mode = options.fetch(:mode).to_sym
  @adapter = options[:adapter]
  @selector = options[:selector]
  @maximum_update_frequency = sanitize_frequency options[:maximum_update_frequency]

  @data_mutex = Mutex.new

  clear_data
  clear_callbacks
end

Instance Attribute Details

#activeBoolean (readonly)

Whether this subscription is currently started and actively streaming data. See #start and #stop for details.

Returns:

  • (Boolean)


48
49
50
# File 'lib/lightstreamer/subscription.rb', line 48

def active
  @active
end

#adapterString? (readonly)

The name of the data adapter from the Lightstreamer session’s adapter set that should be used, or nil to use the default data adapter.

Returns:

  • (String, nil)


31
32
33
# File 'lib/lightstreamer/subscription.rb', line 31

def adapter
  @adapter
end

#fieldsArray (readonly)

The names of the fields to subscribe to on the items.

Returns:

  • (Array)


20
21
22
# File 'lib/lightstreamer/subscription.rb', line 20

def fields
  @fields
end

#itemsArray (readonly)

The names of the items to subscribe to.

Returns:

  • (Array)


15
16
17
# File 'lib/lightstreamer/subscription.rb', line 15

def items
  @items
end

#maximum_update_frequencyFloat, :unfiltered

The maximum number of updates this subscription should receive per second. If this is set to zero, which is the default, then there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow).

Returns:

  • (Float, :unfiltered)


43
44
45
# File 'lib/lightstreamer/subscription.rb', line 43

def maximum_update_frequency
  @maximum_update_frequency
end

#mode:distinct, :merge (readonly)

The operation mode of this subscription.

Returns:

  • (:distinct, :merge)


25
26
27
# File 'lib/lightstreamer/subscription.rb', line 25

def mode
  @mode
end

#selectorString? (readonly)

The selector for table items.

Returns:

  • (String, nil)


36
37
38
# File 'lib/lightstreamer/subscription.rb', line 36

def selector
  @selector
end

#sessionSession (readonly)

The session that this subscription is associated with.

Returns:



10
11
12
# File 'lib/lightstreamer/subscription.rb', line 10

def session
  @session
end

Instance Method Details

#clear_callbacksObject

Removes all #on_data and #on_overflow callbacks present on this subscription.



177
178
179
# File 'lib/lightstreamer/subscription.rb', line 177

def clear_callbacks
  @data_mutex.synchronize { @callbacks = { on_data: [], on_overflow: [], on_end_of_snapshot: [] } }
end

#clear_dataObject

Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.



144
145
146
# File 'lib/lightstreamer/subscription.rb', line 144

def clear_data
  @data = (0...items.size).map { {} }
end

#idFixnum

Returns this subscription’s unique identification number.

Returns:

  • (Fixnum)


77
78
79
# File 'lib/lightstreamer/subscription.rb', line 77

def id
  @id ||= ID_GENERATOR.next
end

#item_data(item_name) ⇒ Hash

Returns a copy of the current data of one of this subscription’s items.

Parameters:

  • item_name (String)

    The name of the item to return the current data for.

Returns:

  • (Hash)

    A copy of the item data.

Raises:

  • (ArgumentError)


186
187
188
189
190
191
# File 'lib/lightstreamer/subscription.rb', line 186

def item_data(item_name)
  index = @items.index item_name
  raise ArgumentError, 'Unknown item' unless index

  @data_mutex.synchronize { @data[index].dup }
end

#on_data(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_values|`. If #mode is :distinct then the values of item_data and new_values will be the same.

Parameters:

  • callback (Proc)

    The callback that is to be run when new data arrives.



154
155
156
# File 'lib/lightstreamer/subscription.rb', line 154

def on_data(&callback)
  @data_mutex.synchronize { @callbacks[:on_data] << callback }
end

#on_end_of_snapshot(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name|`.

Parameters:

  • callback (Proc)

    The callback that is to be run when an overflow is reported for this subscription.



172
173
174
# File 'lib/lightstreamer/subscription.rb', line 172

def on_end_of_snapshot(&callback)
  @data_mutex.synchronize { @callbacks[:on_end_of_snapshot] << callback }
end

#on_overflow(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, overflow_size|`.

Parameters:

  • callback (Proc)

    The callback that is to be run when an overflow is reported for this subscription.



163
164
165
# File 'lib/lightstreamer/subscription.rb', line 163

def on_overflow(&callback)
  @data_mutex.synchronize { @callbacks[:on_overflow] << callback }
end

#process_stream_data(line) ⇒ Boolean

Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.

Parameters:

  • line (String)

    The line of stream data to process.

Returns:

  • (Boolean)

    Whether the passed line of stream data was relevant to this subscription and was successfully processed by it.



215
216
217
218
219
# File 'lib/lightstreamer/subscription.rb', line 215

def process_stream_data(line)
  return true if process_update_message UpdateMessage.parse(line, id, items, fields)
  return true if process_overflow_message OverflowMessage.parse(line, id, items)
  return true if process_end_of_snapshot_message EndOfSnapshotMessage.parse(line, id, items)
end

#set_item_data(item_name, item_data) ⇒ Object

Sets the current data for the item with the specified name.

Parameters:

  • item_name (String)

    The name of the item to set the data for.

  • item_data (Hash)

    The new data for the item.

Raises:

  • (ArgumentError)


197
198
199
200
201
202
203
204
# File 'lib/lightstreamer/subscription.rb', line 197

def set_item_data(item_name, item_data)
  index = @items.index item_name
  raise ArgumentError, 'Unknown item' unless index

  raise ArgumentError, 'Item data must be a hash' unless item_data.is_a? Hash

  @data_mutex.synchronize { @data[index] = item_data.dup }
end

#start(options = {}) ⇒ Object

Starts streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.

Parameters:

  • options (Hash) (defaults to: {})

    The options to start the subscription with.

Options Hash (options):

  • :silent (Boolean)

    Whether the subscription should be started in silent mode. In silent mode the subscription is initiated on the server and begins buffering incoming data, however this data will not be sent to the client for processing until #unsilence is called.

  • :snapshot (Boolean, Fixnum)

    Controls whether the server should send a snapshot of this subscription’s items. If false then the server does not send snapshot information (this is the default). If true then the server will send snapshot information if it’s available. If this subscription’s #mode is :distinct then :snapshot can also be an integer specifying the number of events the server should send as part of the snapshot. If this latter option is used then any callbacks registered with #on_end_of_snapshot will be called once the snapshot for each item is complete.



95
96
97
98
# File 'lib/lightstreamer/subscription.rb', line 95

def start(options = {})
  session.control_request(*start_control_request_args(options)) unless @active
  @active = true
end

#start_control_request_args(options = {}) ⇒ Object

Returns the arguments to pass to to Lightstreamer::Session#control_request in order ot start this subscription with the given options.

Parameters:

  • options (Hash) (defaults to: {})

    The options to start the subscription with.



106
107
108
109
110
111
112
113
114
# File 'lib/lightstreamer/subscription.rb', line 106

def start_control_request_args(options = {})
  operation = options[:silent] ? :add_silent : :add

  options = { LS_table: id, LS_mode: mode.to_s.upcase, LS_id: items, LS_schema: fields, LS_data_adapter: adapter,
              LS_requested_max_frequency: maximum_update_frequency, LS_selector: selector,
              LS_snapshot: options.fetch(:snapshot, false) }

  [operation, options]
end

#stopObject

Stops streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.



125
126
127
128
# File 'lib/lightstreamer/subscription.rb', line 125

def stop
  session.control_request :delete, LS_table: id if @active
  @active = false
end

#unsilenceObject

Unsilences this subscription if it was initially started in silent mode (by passing ‘silent: true` to #start). If this subscription was not started in silent mode then this method has no effect. If an error occurs then a LightstreamerError subclass will be raised.



119
120
121
# File 'lib/lightstreamer/subscription.rb', line 119

def unsilence
  session.control_request :start, LS_table: id
end