Class: Lightstreamer::Subscription
- Inherits:
-
Object
- Object
- Lightstreamer::Subscription
- Defined in:
- lib/lightstreamer/subscription.rb
Overview
This class manages a subscription that can be used to stream data from a Session. Subscriptions should always be created using Lightstreamer::Session#build_subscription. Subscriptions start receiving data after #start is called, and streaming subscription data can be consumed by registering an asynchronous data callback using #on_data, or by polling using #item_data.
Instance Attribute Summary collapse
-
#active ⇒ Boolean
readonly
Whether this subscription is currently started and actively streaming data.
-
#data_adapter ⇒ String?
readonly
The name of the data adapter from the Lightstreamer session’s adapter set to use, or ‘nil` to use the default data adapter.
-
#fields ⇒ Array
readonly
The names of the fields to subscribe to on the items.
-
#items ⇒ Array
readonly
The names of the items to subscribe to.
-
#maximum_update_frequency ⇒ Float, :unfiltered
The maximum number of updates this subscription should receive per second.
-
#mode ⇒ :command, ...
readonly
The operation mode of this subscription.
-
#selector ⇒ String?
readonly
The selector for table items.
-
#session ⇒ Session
readonly
The session that this subscription is associated with.
Instance Method Summary collapse
-
#clear_callbacks ⇒ Object
Removes all #on_data, #on_overflow and #on_end_of_snapshot callbacks present on this subscription.
-
#clear_data ⇒ Object
Clears all current data stored for this subscription.
-
#id ⇒ Fixnum
Returns this subscription’s unique identification number.
-
#initialize(session, options) ⇒ Subscription
constructor
Initializes a new Lightstreamer subscription with the specified options.
-
#item_data(item_name) ⇒ Hash, ...
Returns a copy of the current data of one of this subscription’s items.
-
#on_data(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives.
-
#on_end_of_snapshot(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription.
-
#on_overflow(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription.
-
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription.
-
#set_item_data(item_name, item_data) ⇒ Object
Sets the current data for the item with the specified name.
-
#start(options = {}) ⇒ Object
Starts streaming data for this Lightstreamer subscription.
-
#start_control_request_args(options = {}) ⇒ Object
Returns the arguments to pass to to Lightstreamer::Session#control_request in order to start this subscription with the given options.
-
#stop ⇒ Object
Stops streaming data for this Lightstreamer subscription.
-
#unsilence ⇒ Object
Unsilences this subscription if it was initially started in silent mode by passing ‘silent: true` to #start.
Constructor Details
#initialize(session, options) ⇒ Subscription
Initializes a new Lightstreamer subscription with the specified options.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/lightstreamer/subscription.rb', line 58 def initialize(session, ) @mutex = Mutex.new @session = session @items = .fetch(:items) @fields = .fetch(:fields) @mode = .fetch(:mode).to_sym @data_adapter = [:data_adapter] @selector = [:selector] @maximum_update_frequency = sanitize_frequency [:maximum_update_frequency] clear_data clear_callbacks end |
Instance Attribute Details
#active ⇒ Boolean (readonly)
50 51 52 |
# File 'lib/lightstreamer/subscription.rb', line 50 def active @active end |
#data_adapter ⇒ String? (readonly)
The name of the data adapter from the Lightstreamer session’s adapter set to use, or ‘nil` to use the default data adapter.
32 33 34 |
# File 'lib/lightstreamer/subscription.rb', line 32 def data_adapter @data_adapter end |
#fields ⇒ Array (readonly)
The names of the fields to subscribe to on the items.
20 21 22 |
# File 'lib/lightstreamer/subscription.rb', line 20 def fields @fields end |
#items ⇒ Array (readonly)
The names of the items to subscribe to.
15 16 17 |
# File 'lib/lightstreamer/subscription.rb', line 15 def items @items end |
#maximum_update_frequency ⇒ Float, :unfiltered
The maximum number of updates this subscription should receive per second. If this is set to zero, which is the default, then there is no limit on the update frequency. If set to ‘:unfiltered` then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow). If #mode is `:raw` then the maximum update frequency is treated as `:unfiltered` regardless of its actual value.
45 46 47 |
# File 'lib/lightstreamer/subscription.rb', line 45 def maximum_update_frequency @maximum_update_frequency end |
#mode ⇒ :command, ... (readonly)
The operation mode of this subscription. The four supported operation modes are: ‘:command`, `:distinct`, `:merge` and `:raw`. See the Lightstreamer documentation for details on the different modes.
26 27 28 |
# File 'lib/lightstreamer/subscription.rb', line 26 def mode @mode end |
#selector ⇒ String? (readonly)
The selector for table items.
37 38 39 |
# File 'lib/lightstreamer/subscription.rb', line 37 def selector @selector end |
#session ⇒ Session (readonly)
The session that this subscription is associated with.
10 11 12 |
# File 'lib/lightstreamer/subscription.rb', line 10 def session @session end |
Instance Method Details
#clear_callbacks ⇒ Object
Removes all #on_data, #on_overflow and #on_end_of_snapshot callbacks present on this subscription.
215 216 217 |
# File 'lib/lightstreamer/subscription.rb', line 215 def clear_callbacks @mutex.synchronize { @callbacks = { on_data: [], on_overflow: [], on_end_of_snapshot: [] } } end |
#clear_data ⇒ Object
Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.
149 150 151 |
# File 'lib/lightstreamer/subscription.rb', line 149 def clear_data @mutex.synchronize { @data = (0...items.size).map { SubscriptionItemData.new } } end |
#id ⇒ Fixnum
Returns this subscription’s unique identification number.
78 79 80 |
# File 'lib/lightstreamer/subscription.rb', line 78 def id @id ||= ID_GENERATOR.next end |
#item_data(item_name) ⇒ Hash, ...
Returns a copy of the current data of one of this subscription’s items. If #mode is ‘:merge` then the returned object will be a hash of the item’s state, if it is ‘:command` then an array of row data for the item will be returned, and if it is `:distinct` or `:raw` then just the most recent update received for the item will be returned. The return value will be `nil` if no data for the item has been set or been received.
161 162 163 164 165 166 |
# File 'lib/lightstreamer/subscription.rb', line 161 def item_data(item_name) index = @items.index item_name raise ArgumentError, 'Unknown item' unless index @mutex.synchronize { @data[index].data && @data[index].data.dup } end |
#on_data(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_data|`. If #mode is `:distinct` or `:raw` then `item_data` and `new_data` will be the same.
188 189 190 |
# File 'lib/lightstreamer/subscription.rb', line 188 def on_data(&callback) @mutex.synchronize { @callbacks[:on_data] << callback } end |
#on_end_of_snapshot(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription. End-of-snapshot notifications are only sent when #mode is ‘:command` or `:distinct` and `snapshot: true` was passed to #start. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are `|subscription, item_name|`.
210 211 212 |
# File 'lib/lightstreamer/subscription.rb', line 210 def on_end_of_snapshot(&callback) @mutex.synchronize { @callbacks[:on_end_of_snapshot] << callback } end |
#on_overflow(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription. This is only relevant when this subscription’s #mode is ‘:command` or `:raw`, or if #maximum_update_frequency is `:unfiltered`. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are `|subscription, item_name, overflow_size|`.
199 200 201 |
# File 'lib/lightstreamer/subscription.rb', line 199 def on_overflow(&callback) @mutex.synchronize { @callbacks[:on_overflow] << callback } end |
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.
227 228 229 230 231 |
# File 'lib/lightstreamer/subscription.rb', line 227 def process_stream_data(line) return true if UpdateMessage.parse(line, id, items, fields) return true if OverflowMessage.parse(line, id, items) return true if EndOfSnapshotMessage.parse(line, id, items) end |
#set_item_data(item_name, item_data) ⇒ Object
Sets the current data for the item with the specified name. This is only allowed when #mode is ‘:command` or `:merge`. Raises an exception if the specified item name or item data is invalid.
175 176 177 178 179 180 |
# File 'lib/lightstreamer/subscription.rb', line 175 def set_item_data(item_name, item_data) index = @items.index item_name raise ArgumentError, 'Unknown item' unless index @mutex.synchronize { @data[index].set_data item_data, mode } end |
#start(options = {}) ⇒ Object
Starts streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.
97 98 99 100 101 102 |
# File 'lib/lightstreamer/subscription.rb', line 97 def start( = {}) return if @active session.control_request(*start_control_request_args()) @active = true end |
#start_control_request_args(options = {}) ⇒ Object
Returns the arguments to pass to to Lightstreamer::Session#control_request in order to start this subscription with the given options.
110 111 112 113 114 115 116 117 118 |
# File 'lib/lightstreamer/subscription.rb', line 110 def start_control_request_args( = {}) operation = [:silent] ? :add_silent : :add = { LS_table: id, LS_mode: mode.to_s.upcase, LS_id: items, LS_schema: fields, LS_selector: selector, LS_data_adapter: data_adapter, LS_requested_max_frequency: maximum_update_frequency, LS_snapshot: .fetch(:snapshot, false) } [operation, ] end |
#stop ⇒ Object
Stops streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.
129 130 131 132 |
# File 'lib/lightstreamer/subscription.rb', line 129 def stop session.control_request :delete, LS_table: id if @active @active = false end |
#unsilence ⇒ Object
Unsilences this subscription if it was initially started in silent mode by passing ‘silent: true` to #start. If this subscription was not started in silent mode then this method has no effect. If an error occurs then a LightstreamerError subclass will be raised.
123 124 125 |
# File 'lib/lightstreamer/subscription.rb', line 123 def unsilence session.control_request :start, LS_table: id end |