Class: Lightstreamer::Session
- Inherits:
-
Object
- Object
- Lightstreamer::Session
- Defined in:
- lib/lightstreamer/session.rb
Overview
This class is responsible for managing a Lightstreamer session, and along with the Subscription class forms the primary API for working with Lightstreamer. Start by calling #initialize with the desired server URL and other options, then call #connect to initiate the session. Once connected create subscriptions using #build_subscription and then start streaming data by calling Lightstreamer::Subscription#start or #start_subscriptions. See the Subscription class for details on how to consume the streaming data as it arrives.
Instance Attribute Summary collapse
-
#adapter_set ⇒ String?
readonly
The name of the adapter set to request from the Lightstreamer server.
-
#password ⇒ String?
readonly
The password to connect to the Lightstreamer server with.
-
#polling_enabled ⇒ Boolean
Whether polling mode is enabled.
-
#requested_maximum_bandwidth ⇒ Float
The server-side bandwidth constraint on data usage, expressed in kbps.
-
#server_url ⇒ String
readonly
The URL of the Lightstreamer server to connect to.
-
#username ⇒ String?
readonly
The username to connect to the Lightstreamer server with.
Instance Method Summary collapse
-
#build_subscription(options) ⇒ Subscription
Builds a new subscription for this session with the specified options.
-
#connect ⇒ Object
Connects a new Lightstreamer session using the details passed to #initialize.
-
#connected? ⇒ Boolean
Returns whether this Lightstreamer session is currently connected and has an active stream connection.
-
#control_request(query = {}) ⇒ Object
Sends a request to this session's control connection.
-
#disconnect ⇒ Object
Disconnects this Lightstreamer session and terminates the session on the server.
-
#force_rebind ⇒ Object
Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client.
-
#initialize(options = {}) ⇒ Session
constructor
Initializes this new Lightstreamer session with the passed options.
-
#on_error(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when this session encounters an error on its processing thread caused by an error with the stream connection.
-
#on_message_result(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the outcome of one or more asynchronous message sends arrive.
-
#perform_subscription_actions(actions) ⇒ Array<LightstreamerError, nil>
This method takes an array of subscriptions and actions to perform on those subscriptions.
-
#remove_subscription(subscription) ⇒ Object
Stops the specified subscription and removes it from this session.
-
#remove_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>
Stops the specified subscriptions and removes them from this session.
-
#send_message(message, options = {}) ⇒ Object
Sends a custom message to the Lightstreamer server.
-
#session_id ⇒ String?
Returns the ID of the currently active Lightstreamer session, or
nil
if there is no active session. -
#start_subscriptions(subscriptions, options = {}) ⇒ Array<LightstreamerError, nil>
This method performs Lightstreamer::Subscription#start on all the passed subscriptions.
-
#stop_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>
This method performs Lightstreamer::Subscription#stop on all the passed subscriptions.
Constructor Details
#initialize(options = {}) ⇒ Session
Initializes this new Lightstreamer session with the passed options.
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/lightstreamer/session.rb', line 53 def initialize( = {}) @mutex = Mutex.new @server_url = .fetch :server_url @username = [:username] @password = [:password] @adapter_set = [:adapter_set] @requested_maximum_bandwidth = [:requested_maximum_bandwidth].to_f @polling_enabled = [:polling_enabled] @subscriptions = [] @callbacks = { on_message_result: [], on_error: [] } end |
Instance Attribute Details
#adapter_set ⇒ String? (readonly)
The name of the adapter set to request from the Lightstreamer server. Set by #initialize.
26 27 28 |
# File 'lib/lightstreamer/session.rb', line 26 def adapter_set @adapter_set end |
#password ⇒ String? (readonly)
The password to connect to the Lightstreamer server with. Set by #initialize.
21 22 23 |
# File 'lib/lightstreamer/session.rb', line 21 def password @password end |
#polling_enabled ⇒ Boolean
Whether polling mode is enabled. By default long-running HTTP connections will be used to stream incoming data, but if polling is enabled then repeated short polling requests will be used instead. Polling may work better if there is intermediate buffering on the network that affects timely delivery of data on long-running streaming connections. Polling mode can be turned off and on for a connected session by setting #polling_enabled and then calling #force_rebind.
40 41 42 |
# File 'lib/lightstreamer/session.rb', line 40 def polling_enabled @polling_enabled end |
#requested_maximum_bandwidth ⇒ Float
The server-side bandwidth constraint on data usage, expressed in kbps. If this is zero then no limit is applied.
31 32 33 |
# File 'lib/lightstreamer/session.rb', line 31 def requested_maximum_bandwidth @requested_maximum_bandwidth end |
#server_url ⇒ String (readonly)
The URL of the Lightstreamer server to connect to. Set by #initialize.
11 12 13 |
# File 'lib/lightstreamer/session.rb', line 11 def server_url @server_url end |
#username ⇒ String? (readonly)
The username to connect to the Lightstreamer server with. Set by #initialize.
16 17 18 |
# File 'lib/lightstreamer/session.rb', line 16 def username @username end |
Instance Method Details
#build_subscription(options) ⇒ Subscription
Builds a new subscription for this session with the specified options. Note that ths does not activate the subscription, Lightstreamer::Subscription#start must be called to actually start streaming the subscription's data. See the Lightstreamer::Subscription class for more details.
138 139 140 141 142 143 144 |
# File 'lib/lightstreamer/session.rb', line 138 def build_subscription() subscription = Subscription.new self, @mutex.synchronize { @subscriptions << subscription } subscription end |
#connect ⇒ Object
Connects a new Lightstreamer session using the details passed to #initialize. If an error occurs then a LightstreamerError subclass will be raised.
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/lightstreamer/session.rb', line 69 def connect return if @stream_connection @stream_connection = StreamConnection.new self @stream_connection.connect create_processing_thread rescue StandardError @stream_connection = nil raise end |
#connected? ⇒ Boolean
Returns whether this Lightstreamer session is currently connected and has an active stream connection.
84 85 86 |
# File 'lib/lightstreamer/session.rb', line 84 def connected? !@stream_connection.nil? end |
#control_request(query = {}) ⇒ Object
Sends a request to this session's control connection. If an error occurs then a LightstreamerError subclass will be raised.
287 288 289 |
# File 'lib/lightstreamer/session.rb', line 287 def control_request(query = {}) PostRequest.execute control_request_url, query.merge(LS_session: session_id) end |
#disconnect ⇒ Object
Disconnects this Lightstreamer session and terminates the session on the server. All worker threads are exited, and all subscriptions created during the connected session can no longer be used.
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/lightstreamer/session.rb', line 97 def disconnect control_request LS_op: :destroy if @stream_connection @processing_thread&.join 5 ensure @stream_connection&.disconnect @processing_thread&.exit @subscriptions.each { |subscription| subscription.after_control_request :stop } @subscriptions = [] @processing_thread = @stream_connection = nil end |
#force_rebind ⇒ Object
Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client. The Lightstreamer server requires closure and re-establishment of the stream connection periodically during normal operation, this method just allows such a reconnection to be requested explicitly by the client. This is particularly useful after #polling_enabled has been changed because it forces the stream connection to rebind using the new setting. If an error occurs then a LightstreamerError subclass will be raised.
117 118 119 |
# File 'lib/lightstreamer/session.rb', line 117 def force_rebind control_request LS_op: :force_rebind if @stream_connection end |
#on_error(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when this session encounters an error on its
processing thread caused by an error with the stream connection. The block will be called on a worker thread and
so the code that is run by the block must be thread-safe. The argument passed to the block is |error|
, which
will be a LightstreamerError subclass detailing the error that occurred.
297 298 299 |
# File 'lib/lightstreamer/session.rb', line 297 def on_error(&callback) @mutex.synchronize { @callbacks[:on_error] << callback } end |
#on_message_result(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the outcome of one or more asynchronous
message sends arrive. The block will be called on a worker thread and so the code that is run by the block must be
thread-safe. The arguments passed to the block are |sequence, numbers, error|
.
279 280 281 |
# File 'lib/lightstreamer/session.rb', line 279 def (&callback) @mutex.synchronize { @callbacks[:on_message_result] << callback } end |
#perform_subscription_actions(actions) ⇒ Array<LightstreamerError, nil>
This method takes an array of subscriptions and actions to perform on those subscriptions. The supported actions
are :start
, :unsilence
and :stop
. Calling Lightstreamer::Subscription#start, Lightstreamer::Subscription#unsilence or
Lightstreamer::Subscription#stop on each subscription individually would also work but requires a separate POST request to be
sent for each action, whereas this method performs all of the specified actions in a single POST request which is
significantly faster for a large number of actions. The return value is an array with one entry per action and
indicates the error state returned by the server for that action, or nil
if no error occurred. It will have the
same number of entries as the passed details
array.
223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/lightstreamer/session.rb', line 223 def perform_subscription_actions(actions) request_bodies = actions.map do |hash| PostRequest.request_body hash.fetch(:subscription).(hash.fetch(:action), hash[:options]) end errors = PostRequest.execute_multiple control_request_url, request_bodies # Update the state of subscriptions that did not have an error errors.each_with_index do |error, index| actions[index][:subscription].after_control_request actions[index][:action] unless error end end |
#remove_subscription(subscription) ⇒ Object
Stops the specified subscription and removes it from this session. If an error occurs then a LightstreamerError subclass will be raised. To just stop a subscription with the option of restarting it at a later date call Lightstreamer::Subscription#stop on the subscription itself.
151 152 153 154 155 |
# File 'lib/lightstreamer/session.rb', line 151 def remove_subscription(subscription) errors = remove_subscriptions [subscription] raise errors.first if errors.first end |
#remove_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>
Stops the specified subscriptions and removes them from this session. To just stop subscriptions with the option
of restarting them at a later date use #stop_subscriptions or Lightstreamer::Subscription#stop. The return value is an array
with one entry per subscription and indicates the error state returned by the server for that subscription's stop
request, or nil
if no error occurred.
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/lightstreamer/session.rb', line 165 def remove_subscriptions(subscriptions) errors = stop_subscriptions subscriptions @mutex.synchronize do subscriptions.reject(&:active).each do |subscription| @subscriptions.delete subscription end end errors end |
#send_message(message, options = {}) ⇒ Object
Sends a custom message to the Lightstreamer server. Message sending can be done synchronously or asynchronously.
By default the message will be sent synchronously, i.e. the message will be processed by the server and if an
error occurs a LightstreamerError subclass will be raised immediately. However, if the :async
option is true
then the message will be sent asynchronously, and the result of the message send will be reported to all callbacks
that have been registered via #on_message_result. If :async
is set to true
then the :sequence
and
:number
options must also be specified.
263 264 265 266 267 268 269 270 271 272 |
# File 'lib/lightstreamer/session.rb', line 263 def (, = {}) url = URI.join(@stream_connection.control_address, '/lightstreamer/send_message.txt').to_s query = { LS_session: session_id, LS_message: } query[:LS_sequence] = .fetch(:sequence) if [:async] query[:LS_msg_prog] = .fetch(:number) if [:async] query[:LS_max_wait] = [:max_wait] if [:max_wait] PostRequest.execute url, query end |
#session_id ⇒ String?
Returns the ID of the currently active Lightstreamer session, or nil
if there is no active session.
91 92 93 |
# File 'lib/lightstreamer/session.rb', line 91 def session_id @stream_connection&.session_id end |
#start_subscriptions(subscriptions, options = {}) ⇒ Array<LightstreamerError, nil>
This method performs Lightstreamer::Subscription#start on all the passed subscriptions. Calling Lightstreamer::Subscription#start on each
subscription individually would also work but requires a separate POST request to be sent for every subscription,
whereas this method starts all of the passed subscriptions in a single POST request which is significantly faster
for a large number of subscriptions. The return value is an array with one entry per subscription and indicates
the error state returned by the server for that subscription's start request, or nil
if no error occurred.
188 189 190 191 192 |
# File 'lib/lightstreamer/session.rb', line 188 def start_subscriptions(subscriptions, = {}) details = subscriptions.map { |subscription| { subscription: subscription, action: :start, options: } } perform_subscription_actions details end |
#stop_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>
This method performs Lightstreamer::Subscription#stop on all the passed subscriptions. Calling Lightstreamer::Subscription#stop on each
subscription individually would also work but requires a separate POST request to be sent for every subscription,
whereas this method stops all of the passed subscriptions in a single POST request which is significantly faster
for a large number of subscriptions. The return value is an array with one entry per subscription and indicates
the error state returned by the server for that subscription's stop request, or nil
if no error occurred.
203 204 205 206 207 |
# File 'lib/lightstreamer/session.rb', line 203 def stop_subscriptions(subscriptions) details = subscriptions.map { |subscription| { subscription: subscription, action: :stop } } perform_subscription_actions details end |