Class: Lightstreamer::Session
- Inherits:
-
Object
- Object
- Lightstreamer::Session
- Defined in:
- lib/lightstreamer/session.rb
Overview
This class is responsible for managing a Lightstreamer session, and along with the Subscription class forms the primary API for working with Lightstreamer.
Instance Attribute Summary collapse
-
#adapter_set ⇒ String?
readonly
The name of the adapter set to request from the Lightstreamer server.
-
#error ⇒ LightstreamerError?
readonly
If an error occurs on the stream connection that causes the session to terminate then details of the error will be stored in this attribute.
-
#password ⇒ String?
readonly
The password to connect to the Lightstreamer server with.
-
#requested_maximum_bandwidth ⇒ Float
The server-side bandwidth constraint on data usage, expressed in kbps.
-
#server_url ⇒ String
readonly
The URL of the Lightstreamer server to connect to.
-
#username ⇒ String?
readonly
The username to connect to the Lightstreamer server with.
Instance Method Summary collapse
-
#build_subscription(options) ⇒ Subscription
Builds a new subscription for this session with the specified options.
-
#bulk_subscription_start(*subscriptions) ⇒ Array<LightstreamerError, nil>
This method performs a bulk Lightstreamer::Subscription#start on all the passed subscriptions.
-
#connect ⇒ Object
Connects a new Lightstreamer session using the details passed to #initialize.
-
#connected? ⇒ Boolean
Returns whether this Lightstreamer session is currently connected and has an active stream connection.
-
#control_request(operation, options = {}) ⇒ Object
Sends a request to the control connection.
-
#disconnect ⇒ Object
Disconnects this Lightstreamer session and terminates the session on the server.
-
#force_rebind ⇒ Object
Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client.
-
#initialize(options = {}) ⇒ Session
constructor
Initializes this new Lightstreamer session with the passed options.
-
#on_message_result(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the outcome of an asynchronous message send arrives.
-
#remove_subscription(subscription) ⇒ Object
Stops the specified subscription and removes it from this session.
-
#send_message(message, options = {}) ⇒ Object
Sends a custom message to the Lightstreamer server.
-
#session_id ⇒ String?
Returns the ID of the currently active Lightstreamer session, or
nilif there is no active session.
Constructor Details
#initialize(options = {}) ⇒ Session
Initializes this new Lightstreamer session with the passed options.
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lightstreamer/session.rb', line 46 def initialize( = {}) @subscriptions = [] @subscriptions_mutex = Mutex.new @server_url = .fetch :server_url @username = [:username] @password = [:password] @adapter_set = [:adapter_set] @requested_maximum_bandwidth = [:requested_maximum_bandwidth].to_f @on_message_result_callbacks = [] end |
Instance Attribute Details
#adapter_set ⇒ String? (readonly)
The name of the adapter set to request from the Lightstreamer server. Set by #initialize.
23 24 25 |
# File 'lib/lightstreamer/session.rb', line 23 def adapter_set @adapter_set end |
#error ⇒ LightstreamerError? (readonly)
If an error occurs on the stream connection that causes the session to terminate then details of the error will be stored in this attribute. If the session is terminated as a result of calling #disconnect then the error will be Errors::SessionEndError.
30 31 32 |
# File 'lib/lightstreamer/session.rb', line 30 def error @error end |
#password ⇒ String? (readonly)
The password to connect to the Lightstreamer server with. Set by #initialize.
18 19 20 |
# File 'lib/lightstreamer/session.rb', line 18 def password @password end |
#requested_maximum_bandwidth ⇒ Float
The server-side bandwidth constraint on data usage, expressed in kbps. If this is zero then no limit is applied.
35 36 37 |
# File 'lib/lightstreamer/session.rb', line 35 def requested_maximum_bandwidth @requested_maximum_bandwidth end |
#server_url ⇒ String (readonly)
The URL of the Lightstreamer server to connect to. Set by #initialize.
8 9 10 |
# File 'lib/lightstreamer/session.rb', line 8 def server_url @server_url end |
#username ⇒ String? (readonly)
The username to connect to the Lightstreamer server with. Set by #initialize.
13 14 15 |
# File 'lib/lightstreamer/session.rb', line 13 def username @username end |
Instance Method Details
#build_subscription(options) ⇒ Subscription
Builds a new subscription for this session with the specified options. Note that ths does not activate the subscription, Lightstreamer::Subscription#start must be called to actually start streaming the subscription’s data. See the Lightstreamer::Subscription class for more details.
132 133 134 135 136 137 138 |
# File 'lib/lightstreamer/session.rb', line 132 def build_subscription() subscription = Subscription.new self, @subscriptions_mutex.synchronize { @subscriptions << subscription } subscription end |
#bulk_subscription_start(*subscriptions) ⇒ Array<LightstreamerError, nil>
This method performs a bulk Lightstreamer::Subscription#start on all the passed subscriptions. Calling Lightstreamer::Subscription#start on each of them individually would also work but requires a separate POST request to be sent for every subscription. This request starts all of the passed subscriptions in a single POST request which is significantly faster for a large number of subscriptions. The return value is an array with one entry per subscription and indicates the error state returned by the server for that subscription’s start request, or nil if no error occurred.
158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/lightstreamer/session.rb', line 158 def bulk_subscription_start(*subscriptions) request_bodies = subscriptions.map do |subscription| PostRequest.request_body session_id, *subscription.start_control_request_args end errors = PostRequest.bulk_execute @stream_connection.control_address, request_bodies # Set @active to true on all subscriptions that did not have an error errors.each_with_index do |error, index| subscriptions[index].instance_variable_set :@active, true if error.nil? end end |
#connect ⇒ Object
Connects a new Lightstreamer session using the details passed to #initialize. If an error occurs then a LightstreamerError subclass will be raised.
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/lightstreamer/session.rb', line 61 def connect return if @stream_connection @error = nil @stream_connection = StreamConnection.new self @stream_connection.connect create_processing_thread rescue @stream_connection = nil raise end |
#connected? ⇒ Boolean
Returns whether this Lightstreamer session is currently connected and has an active stream connection.
78 79 80 |
# File 'lib/lightstreamer/session.rb', line 78 def connected? !@stream_connection.nil? end |
#control_request(operation, options = {}) ⇒ Object
Sends a request to the control connection. If an error occurs then a LightstreamerError subclass will be raised.
221 222 223 224 225 |
# File 'lib/lightstreamer/session.rb', line 221 def control_request(operation, = {}) url = URI.join(@stream_connection.control_address, '/lightstreamer/control.txt').to_s PostRequest.execute url, .merge(LS_session: session_id, LS_op: operation) end |
#disconnect ⇒ Object
Disconnects this Lightstreamer session and terminates the session on the server. All worker threads are exited.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/lightstreamer/session.rb', line 90 def disconnect control_request :destroy if @stream_connection @processing_thread.join 5 if @processing_thread ensure @stream_connection.disconnect if @stream_connection @processing_thread.exit if @processing_thread @subscriptions.each do |subscription| subscription.instance_variable_set :@active, false end @processing_thread = @stream_connection = nil end |
#force_rebind ⇒ Object
Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client. The Lightstreamer server requires closure and re-establishment of the stream connection periodically during normal operation, this method just allows such a reconnection to be requested explicitly by the client. If an error occurs then a LightstreamerError subclass will be raised.
109 110 111 112 113 |
# File 'lib/lightstreamer/session.rb', line 109 def force_rebind return unless @stream_connection control_request :force_rebind end |
#on_message_result(&callback) ⇒ Object
Adds the passed block to the list of callbacks that will be run when the outcome of an asynchronous message send arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|sequence, numbers, error|`.
213 214 215 |
# File 'lib/lightstreamer/session.rb', line 213 def (&callback) @on_message_result_callbacks << callback end |
#remove_subscription(subscription) ⇒ Object
Stops the specified subscription and removes it from this session. If an error occurs then a LightstreamerError subclass will be raised. To just stop a subscription with the option of restarting it at a later date call Lightstreamer::Subscription#stop on the subscription itself.
143 144 145 146 147 |
# File 'lib/lightstreamer/session.rb', line 143 def remove_subscription(subscription) subscription.stop @subscriptions_mutex.synchronize { @subscriptions.delete subscription } end |
#send_message(message, options = {}) ⇒ Object
Sends a custom message to the Lightstreamer server. Message sending can be done synchronously or asynchronously. By default the message will be sent synchronously, i.e. the message will be processed by the server and if an error occurs a LightstreamerError subclass will be raised immediately. However, if the :async option is true then the message will be sent asynchronously, and the result of the message send will be reported to all callbacks that have been registered via #on_message_result.
197 198 199 200 201 202 203 204 205 206 |
# File 'lib/lightstreamer/session.rb', line 197 def (, = {}) url = URI.join(@stream_connection.control_address, '/lightstreamer/send_message.txt').to_s query = { LS_session: session_id, LS_message: } query[:LS_sequence] = .fetch(:sequence) if [:async] query[:LS_msg_prog] = .fetch(:number) if [:async] query[:LS_max_wait] = [:max_wait] if [:max_wait] PostRequest.execute url, query end |
#session_id ⇒ String?
Returns the ID of the currently active Lightstreamer session, or nil if there is no active session.
85 86 87 |
# File 'lib/lightstreamer/session.rb', line 85 def session_id @stream_connection && @stream_connection.session_id end |