Class: Lightstreamer::Subscription
- Inherits:
-
Object
- Object
- Lightstreamer::Subscription
- Defined in:
- lib/lightstreamer/subscription.rb
Overview
This class manages a subscription that can be used to stream data from a Session. Subscriptions should always be created using Lightstreamer::Session#build_subscription. Subscriptions start receiving data after #start is called, and streaming subscription data can be consumed by registering an asynchronous data callback using #on_data, or by polling using #item_data.
Instance Attribute Summary collapse
-
#active ⇒ Boolean
readonly
Whether this subscription is currently started and actively streaming data.
-
#adapter ⇒ String?
readonly
The name of the data adapter from the Lightstreamer session’s adapter set that should be used, or
nilto use the default data adapter. -
#fields ⇒ Array
readonly
The names of the fields to subscribe to on the items.
-
#items ⇒ Array
readonly
The names of the items to subscribe to.
-
#maximum_update_frequency ⇒ Float, :unfiltered
The maximum number of updates this subscription should receive per second.
-
#mode ⇒ :distinct, :merge
readonly
The operation mode of this subscription.
-
#selector ⇒ String?
readonly
The selector for table items.
-
#session ⇒ Session
readonly
The session that this subscription is associated with.
Instance Method Summary collapse
-
#clear_callbacks ⇒ Object
Removes all #on_data and #on_overflow callbacks present on this subscription.
-
#clear_data ⇒ Object
Clears all current data stored for this subscription.
-
#id ⇒ Fixnum
Returns this subscription’s unique identification number.
-
#initialize(session, options) ⇒ Subscription
constructor
Initializes a new Lightstreamer subscription with the specified options.
-
#item_data(item_name) ⇒ Hash
Returns a copy of the current data of one of this subscription’s items.
-
#on_data(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives.
-
#on_end_of_snapshot(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription.
-
#on_overflow(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription.
-
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription.
-
#set_item_data(item_name, item_data) ⇒ Object
Sets the current data for the item with the specified name.
-
#start(options = {}) ⇒ Object
Starts streaming data for this Lightstreamer subscription.
-
#start_control_request_args(options = {}) ⇒ Object
Returns the arguments to pass to to Lightstreamer::Session#control_request in order ot start this subscription with the given options.
-
#stop ⇒ Object
Stops streaming data for this Lightstreamer subscription.
-
#unsilence ⇒ Object
Unsilences this subscription if it was initially started in silent mode (by passing ‘silent: true` to #start).
Constructor Details
#initialize(session, options) ⇒ Subscription
Initializes a new Lightstreamer subscription with the specified options.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/lightstreamer/subscription.rb', line 56 def initialize(session, ) @session = session @items = .fetch(:items) @fields = .fetch(:fields) @mode = .fetch(:mode).to_sym @adapter = [:adapter] @selector = [:selector] @maximum_update_frequency = sanitize_frequency [:maximum_update_frequency] @data_mutex = Mutex.new clear_data clear_callbacks end |
Instance Attribute Details
#active ⇒ Boolean (readonly)
48 49 50 |
# File 'lib/lightstreamer/subscription.rb', line 48 def active @active end |
#adapter ⇒ String? (readonly)
The name of the data adapter from the Lightstreamer session’s adapter set that should be used, or nil to use the default data adapter.
31 32 33 |
# File 'lib/lightstreamer/subscription.rb', line 31 def adapter @adapter end |
#fields ⇒ Array (readonly)
The names of the fields to subscribe to on the items.
20 21 22 |
# File 'lib/lightstreamer/subscription.rb', line 20 def fields @fields end |
#items ⇒ Array (readonly)
The names of the items to subscribe to.
15 16 17 |
# File 'lib/lightstreamer/subscription.rb', line 15 def items @items end |
#maximum_update_frequency ⇒ Float, :unfiltered
The maximum number of updates this subscription should receive per second. If this is set to zero, which is the default, then there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow).
43 44 45 |
# File 'lib/lightstreamer/subscription.rb', line 43 def maximum_update_frequency @maximum_update_frequency end |
#mode ⇒ :distinct, :merge (readonly)
The operation mode of this subscription.
25 26 27 |
# File 'lib/lightstreamer/subscription.rb', line 25 def mode @mode end |
#selector ⇒ String? (readonly)
The selector for table items.
36 37 38 |
# File 'lib/lightstreamer/subscription.rb', line 36 def selector @selector end |
#session ⇒ Session (readonly)
The session that this subscription is associated with.
10 11 12 |
# File 'lib/lightstreamer/subscription.rb', line 10 def session @session end |
Instance Method Details
#clear_callbacks ⇒ Object
Removes all #on_data and #on_overflow callbacks present on this subscription.
177 178 179 |
# File 'lib/lightstreamer/subscription.rb', line 177 def clear_callbacks @data_mutex.synchronize { @callbacks = { on_data: [], on_overflow: [], on_end_of_snapshot: [] } } end |
#clear_data ⇒ Object
Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.
144 145 146 |
# File 'lib/lightstreamer/subscription.rb', line 144 def clear_data @data = (0...items.size).map { {} } end |
#id ⇒ Fixnum
Returns this subscription’s unique identification number.
77 78 79 |
# File 'lib/lightstreamer/subscription.rb', line 77 def id @id ||= ID_GENERATOR.next end |
#item_data(item_name) ⇒ Hash
Returns a copy of the current data of one of this subscription’s items.
186 187 188 189 190 191 |
# File 'lib/lightstreamer/subscription.rb', line 186 def item_data(item_name) index = @items.index item_name raise ArgumentError, 'Unknown item' unless index @data_mutex.synchronize { @data[index].dup } end |
#on_data(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_values|`. If #mode is :distinct then the values of item_data and new_values will be the same.
154 155 156 |
# File 'lib/lightstreamer/subscription.rb', line 154 def on_data(&callback) @data_mutex.synchronize { @callbacks[:on_data] << callback } end |
#on_end_of_snapshot(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an end-of-snapshot notification for this subscription. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name|`.
172 173 174 |
# File 'lib/lightstreamer/subscription.rb', line 172 def on_end_of_snapshot(&callback) @data_mutex.synchronize { @callbacks[:on_end_of_snapshot] << callback } end |
#on_overflow(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, overflow_size|`.
163 164 165 |
# File 'lib/lightstreamer/subscription.rb', line 163 def on_overflow(&callback) @data_mutex.synchronize { @callbacks[:on_overflow] << callback } end |
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.
215 216 217 218 219 |
# File 'lib/lightstreamer/subscription.rb', line 215 def process_stream_data(line) return true if UpdateMessage.parse(line, id, items, fields) return true if OverflowMessage.parse(line, id, items) return true if EndOfSnapshotMessage.parse(line, id, items) end |
#set_item_data(item_name, item_data) ⇒ Object
Sets the current data for the item with the specified name.
197 198 199 200 201 202 203 204 |
# File 'lib/lightstreamer/subscription.rb', line 197 def set_item_data(item_name, item_data) index = @items.index item_name raise ArgumentError, 'Unknown item' unless index raise ArgumentError, 'Item data must be a hash' unless item_data.is_a? Hash @data_mutex.synchronize { @data[index] = item_data.dup } end |
#start(options = {}) ⇒ Object
Starts streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.
95 96 97 98 |
# File 'lib/lightstreamer/subscription.rb', line 95 def start( = {}) session.control_request(*start_control_request_args()) unless @active @active = true end |
#start_control_request_args(options = {}) ⇒ Object
Returns the arguments to pass to to Lightstreamer::Session#control_request in order ot start this subscription with the given options.
106 107 108 109 110 111 112 113 114 |
# File 'lib/lightstreamer/subscription.rb', line 106 def start_control_request_args( = {}) operation = [:silent] ? :add_silent : :add = { LS_table: id, LS_mode: mode.to_s.upcase, LS_id: items, LS_schema: fields, LS_data_adapter: adapter, LS_requested_max_frequency: maximum_update_frequency, LS_selector: selector, LS_snapshot: .fetch(:snapshot, false) } [operation, ] end |
#stop ⇒ Object
Stops streaming data for this Lightstreamer subscription. If an error occurs then a LightstreamerError subclass will be raised.
125 126 127 128 |
# File 'lib/lightstreamer/subscription.rb', line 125 def stop session.control_request :delete, LS_table: id if @active @active = false end |
#unsilence ⇒ Object
Unsilences this subscription if it was initially started in silent mode (by passing ‘silent: true` to #start). If this subscription was not started in silent mode then this method has no effect. If an error occurs then a LightstreamerError subclass will be raised.
119 120 121 |
# File 'lib/lightstreamer/subscription.rb', line 119 def unsilence session.control_request :start, LS_table: id end |