Class: Lightstreamer::Subscription
- Inherits:
-
Object
- Object
- Lightstreamer::Subscription
- Defined in:
- lib/lightstreamer/subscription.rb
Overview
Describes a subscription that can be bound to a Lightstreamer session in order to consume its data. A subscription is described by the options passed to #initialize. Incoming data can be consumed by registering an asynchronous data callback using #add_data_callback, or by polling #retrieve_item_data. Subscriptions start receiving data only once they are attached to a Lightstreamer session using Lightstreamer::Session#subscribe.
Instance Attribute Summary collapse
-
#adapter ⇒ String
readonly
The name of the data adapter from the Lightstreamer session’s adapter set that should be used.
-
#fields ⇒ Array
readonly
The names of the fields to subscribe to on the items.
-
#id ⇒ Fixnum
readonly
The unique identification number of this subscription.
-
#items ⇒ Array
readonly
The names of the items to subscribe to.
-
#mode ⇒ :distinct, :merge
readonly
The operation mode of this subscription.
Class Method Summary collapse
-
.next_id ⇒ Fixnum
Returns the next unique ID to use for a new subscription.
Instance Method Summary collapse
-
#add_data_callback(&block) ⇒ Proc
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives.
-
#clear_data ⇒ Object
Clears all current data stored for this subscription.
-
#clear_data_for_item(item_name) ⇒ Object
Clears the current data stored for the specified item.
-
#initialize(options) ⇒ Subscription
constructor
Initializes a new Lightstreamer subscription with the specified options.
-
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription.
-
#remove_data_callback(block) ⇒ Object
Removes a data callback that was added by #add_data_callback.
-
#retrieve_item_data(item_name) ⇒ Hash, Array
Returns the current data of one of this subscription’s items.
Constructor Details
#initialize(options) ⇒ Subscription
Initializes a new Lightstreamer subscription with the specified options. This can then be passed to Lightstreamer::Session#subscribe to activate the subscription on a Lightstreamer session.
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/lightstreamer/subscription.rb', line 33 def initialize() @id = self.class.next_id @items = Array(.fetch(:items)) @fields = Array(.fetch(:fields)) @mode = .fetch(:mode).to_sym @adapter = [:adapter] @data_mutex = Mutex.new clear_data @data_callbacks = [] end |
Instance Attribute Details
#adapter ⇒ String (readonly)
Returns The name of the data adapter from the Lightstreamer session’s adapter set that should be used. If nil then the default data adapter will be used.
22 23 24 |
# File 'lib/lightstreamer/subscription.rb', line 22 def adapter @adapter end |
#fields ⇒ Array (readonly)
Returns The names of the fields to subscribe to on the items.
15 16 17 |
# File 'lib/lightstreamer/subscription.rb', line 15 def fields @fields end |
#id ⇒ Fixnum (readonly)
Returns The unique identification number of this subscription. This is used to identify the subscription in incoming Lightstreamer data.
9 10 11 |
# File 'lib/lightstreamer/subscription.rb', line 9 def id @id end |
#items ⇒ Array (readonly)
Returns The names of the items to subscribe to.
12 13 14 |
# File 'lib/lightstreamer/subscription.rb', line 12 def items @items end |
#mode ⇒ :distinct, :merge (readonly)
Returns The operation mode of this subscription.
18 19 20 |
# File 'lib/lightstreamer/subscription.rb', line 18 def mode @mode end |
Class Method Details
.next_id ⇒ Fixnum
Returns the next unique ID to use for a new subscription.
130 131 132 133 |
# File 'lib/lightstreamer/subscription.rb', line 130 def self.next_id @next_id ||= 0 @next_id += 1 end |
Instance Method Details
#add_data_callback(&block) ⇒ Proc
Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_values|`.
76 77 78 79 80 |
# File 'lib/lightstreamer/subscription.rb', line 76 def add_data_callback(&block) @data_mutex.synchronize { @data_callbacks << block } block end |
#clear_data ⇒ Object
Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.
49 50 51 52 53 |
# File 'lib/lightstreamer/subscription.rb', line 49 def clear_data @data_mutex.synchronize do @data = (0...items.size).map { { distinct: [], merge: {} }.fetch(mode) } end end |
#clear_data_for_item(item_name) ⇒ Object
Clears the current data stored for the specified item. This is important to do when #mode is :distinct as otherwise the incoming data will build up indefinitely.
59 60 61 62 63 64 65 66 |
# File 'lib/lightstreamer/subscription.rb', line 59 def clear_data_for_item(item_name) index = @items.index item_name raise ArgumentError, 'Unrecognized item name' unless index @data_mutex.synchronize do @data[index] = { distinct: [], merge: {} }.fetch(mode) end end |
#process_stream_data(line) ⇒ Boolean
Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/lightstreamer/subscription.rb', line 111 def process_stream_data(line) item_index, new_values = parse_stream_data line return false unless item_index @data_mutex.synchronize do data = @data[item_index] data << new_values if mode == :distinct data.merge!(new_values) if mode == :merge call_data_callbacks @items[item_index], data, new_values end true end |
#remove_data_callback(block) ⇒ Object
Removes a data callback that was added by #add_data_callback.
85 86 87 |
# File 'lib/lightstreamer/subscription.rb', line 85 def remove_data_callback(block) @data_mutex.synchronize { @data_callbacks.delete block } end |
#retrieve_item_data(item_name) ⇒ Hash, Array
Returns the current data of one of this subscription’s items.
95 96 97 98 99 100 101 102 |
# File 'lib/lightstreamer/subscription.rb', line 95 def retrieve_item_data(item_name) index = @items.index item_name raise ArgumentError, 'Unrecognized item name' unless index @data_mutex.synchronize do @data[index].dup end end |