Class: Lightstreamer::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/session.rb

Overview

This class is responsible for managing a Lightstreamer session, and along with the Subscription class forms the primary API for working with Lightstreamer. Start by calling #initialize with the desired server URL and other options, then call #connect to initiate the session. Once connected create subscriptions using #build_subscription and then start streaming data by calling Lightstreamer::Subscription#start or #start_subscriptions. See the Subscription class for details on how to consume the streaming data as it arrives.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Session

Initializes this new Lightstreamer session with the passed options.

Parameters:

  • options (Hash) (defaults to: {})

    The options to create the session with.

Options Hash (options):

  • :server_url (String)

    The URL of the Lightstreamer server. Required.

  • :username (String)

    The username to connect to the server with.

  • :password (String)

    The password to connect to the server with.

  • :adapter_set (String)

    The name of the adapter set to request from the server.

  • :requested_maximum_bandwidth. (Float)

    The server-side bandwidth constraint on data usage, expressed in kbps. Defaults to zero which means no limit is applied.

  • :polling_enabled (Boolean)

    Whether polling mode is enabled. See #polling_enabled for details. Defaults to false.



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/lightstreamer/session.rb', line 53

def initialize(options = {})
  @mutex = Mutex.new

  @server_url = options.fetch :server_url
  @username = options[:username]
  @password = options[:password]
  @adapter_set = options[:adapter_set]
  @requested_maximum_bandwidth = options[:requested_maximum_bandwidth].to_f
  @polling_enabled = options[:polling_enabled]

  @subscriptions = []
  @callbacks = { on_message_result: [], on_error: [] }
end

Instance Attribute Details

#adapter_setString? (readonly)

The name of the adapter set to request from the Lightstreamer server. Set by #initialize.

Returns:

  • (String, nil)


26
27
28
# File 'lib/lightstreamer/session.rb', line 26

def adapter_set
  @adapter_set
end

#passwordString? (readonly)

The password to connect to the Lightstreamer server with. Set by #initialize.

Returns:

  • (String, nil)


21
22
23
# File 'lib/lightstreamer/session.rb', line 21

def password
  @password
end

#polling_enabledBoolean

Whether polling mode is enabled. By default long-running HTTP connections will be used to stream incoming data, but if polling is enabled then repeated short polling requests will be used instead. Polling may work better if there is intermediate buffering on the network that affects timely delivery of data on long-running streaming connections. Polling mode can be turned off and on for a connected session by setting #polling_enabled and then calling #force_rebind.

Returns:

  • (Boolean)

    whether polling mode is enabled.



40
41
42
# File 'lib/lightstreamer/session.rb', line 40

def polling_enabled
  @polling_enabled
end

#requested_maximum_bandwidthFloat

The server-side bandwidth constraint on data usage, expressed in kbps. If this is zero then no limit is applied.

Returns:

  • (Float)


31
32
33
# File 'lib/lightstreamer/session.rb', line 31

def requested_maximum_bandwidth
  @requested_maximum_bandwidth
end

#server_urlString (readonly)

The URL of the Lightstreamer server to connect to. Set by #initialize.

Returns:

  • (String)


11
12
13
# File 'lib/lightstreamer/session.rb', line 11

def server_url
  @server_url
end

#usernameString? (readonly)

The username to connect to the Lightstreamer server with. Set by #initialize.

Returns:

  • (String, nil)


16
17
18
# File 'lib/lightstreamer/session.rb', line 16

def username
  @username
end

Instance Method Details

#build_subscription(options) ⇒ Subscription

Builds a new subscription for this session with the specified options. Note that ths does not activate the subscription, Lightstreamer::Subscription#start must be called to actually start streaming the subscription's data. See the Lightstreamer::Subscription class for more details.

Parameters:

  • options (Hash)

    The options to create the subscription with.

Options Hash (options):

  • :items (Array)

    The names of the items to subscribe to. Required.

  • :fields (Array)

    The names of the fields to subscribe to on the items. Required.

  • :mode (:command, :distinct, :merge, :raw)

    The operation mode of the subscription. Required.

  • :adapter (String)

    The name of the data adapter from this session's adapter set that should be used. If this is not set or is set to nil then the default data adapter will be used.

  • :selector (String)

    The selector for table items. Optional.

  • :maximum_update_frequency (Float, :unfiltered)

    The maximum number of updates the subscription should receive per second. Defaults to zero which means there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for the subscription and it is possible for overflows to occur (see Lightstreamer::Subscription#on_overflow).

Returns:



138
139
140
141
142
143
144
# File 'lib/lightstreamer/session.rb', line 138

def build_subscription(options)
  subscription = Subscription.new self, options

  @mutex.synchronize { @subscriptions << subscription }

  subscription
end

#connectObject

Connects a new Lightstreamer session using the details passed to #initialize. If an error occurs then a LightstreamerError subclass will be raised.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/lightstreamer/session.rb', line 69

def connect
  return if @stream_connection

  @stream_connection = StreamConnection.new self
  @stream_connection.connect

  create_processing_thread
rescue StandardError
  @stream_connection = nil
  raise
end

#connected?Boolean

Returns whether this Lightstreamer session is currently connected and has an active stream connection.

Returns:

  • (Boolean)


84
85
86
# File 'lib/lightstreamer/session.rb', line 84

def connected?
  !@stream_connection.nil?
end

#control_request(query = {}) ⇒ Object

Sends a request to this session's control connection. If an error occurs then a LightstreamerError subclass will be raised.

Parameters:

  • query (Hash) (defaults to: {})

    The details of the control request query.



287
288
289
# File 'lib/lightstreamer/session.rb', line 287

def control_request(query = {})
  PostRequest.execute control_request_url, query.merge(LS_session: session_id)
end

#disconnectObject

Disconnects this Lightstreamer session and terminates the session on the server. All worker threads are exited, and all subscriptions created during the connected session can no longer be used.



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/lightstreamer/session.rb', line 97

def disconnect
  control_request LS_op: :destroy if @stream_connection

  @processing_thread&.join 5
ensure
  @stream_connection&.disconnect
  @processing_thread&.exit

  @subscriptions.each { |subscription| subscription.after_control_request :stop }
  @subscriptions = []

  @processing_thread = @stream_connection = nil
end

#force_rebindObject

Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client. The Lightstreamer server requires closure and re-establishment of the stream connection periodically during normal operation, this method just allows such a reconnection to be requested explicitly by the client. This is particularly useful after #polling_enabled has been changed because it forces the stream connection to rebind using the new setting. If an error occurs then a LightstreamerError subclass will be raised.



117
118
119
# File 'lib/lightstreamer/session.rb', line 117

def force_rebind
  control_request LS_op: :force_rebind if @stream_connection
end

#on_error(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when this session encounters an error on its processing thread caused by an error with the stream connection. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The argument passed to the block is |error|, which will be a LightstreamerError subclass detailing the error that occurred.

Parameters:

  • callback (Proc)

    The callback that is to be run.



297
298
299
# File 'lib/lightstreamer/session.rb', line 297

def on_error(&callback)
  @mutex.synchronize { @callbacks[:on_error] << callback }
end

#on_message_result(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the outcome of one or more asynchronous message sends arrive. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are |sequence, numbers, error|.

Parameters:

  • callback (Proc)

    The callback that is to be run.



279
280
281
# File 'lib/lightstreamer/session.rb', line 279

def on_message_result(&callback)
  @mutex.synchronize { @callbacks[:on_message_result] << callback }
end

#perform_subscription_actions(actions) ⇒ Array<LightstreamerError, nil>

This method takes an array of subscriptions and actions to perform on those subscriptions. The supported actions are :start, :unsilence and :stop. Calling Lightstreamer::Subscription#start, Lightstreamer::Subscription#unsilence or Lightstreamer::Subscription#stop on each subscription individually would also work but requires a separate POST request to be sent for each action, whereas this method performs all of the specified actions in a single POST request which is significantly faster for a large number of actions. The return value is an array with one entry per action and indicates the error state returned by the server for that action, or nil if no error occurred. It will have the same number of entries as the passed details array.

Parameters:

  • actions (Array<Hash>)

    This array describes the subscription actions to perform. Each entry must be a hash containing a :subscription key specifying the Lightstreamer::Subscription, and an :action key specifying the action to perform on the subscription (either :start, :unsilence or :stop). If :action is :start then an :options key can be specified, and the supported options are the same as for Lightstreamer::Subscription#start.

Returns:



223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/lightstreamer/session.rb', line 223

def perform_subscription_actions(actions)
  request_bodies = actions.map do |hash|
    PostRequest.request_body hash.fetch(:subscription).control_request_options(hash.fetch(:action), hash[:options])
  end

  errors = PostRequest.execute_multiple control_request_url, request_bodies

  # Update the state of subscriptions that did not have an error
  errors.each_with_index do |error, index|
    actions[index][:subscription].after_control_request actions[index][:action] unless error
  end
end

#remove_subscription(subscription) ⇒ Object

Stops the specified subscription and removes it from this session. If an error occurs then a LightstreamerError subclass will be raised. To just stop a subscription with the option of restarting it at a later date call Lightstreamer::Subscription#stop on the subscription itself.

Parameters:

  • subscription (Subscription)

    The subscription to stop and remove from this session.



151
152
153
154
155
# File 'lib/lightstreamer/session.rb', line 151

def remove_subscription(subscription)
  errors = remove_subscriptions [subscription]

  raise errors.first if errors.first
end

#remove_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>

Stops the specified subscriptions and removes them from this session. To just stop subscriptions with the option of restarting them at a later date use #stop_subscriptions or Lightstreamer::Subscription#stop. The return value is an array with one entry per subscription and indicates the error state returned by the server for that subscription's stop request, or nil if no error occurred.

Parameters:

  • subscriptions (Array<Subscription>)

    The subscriptions to stop and remove from this session.

Returns:



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/lightstreamer/session.rb', line 165

def remove_subscriptions(subscriptions)
  errors = stop_subscriptions subscriptions

  @mutex.synchronize do
    subscriptions.reject(&:active).each do |subscription|
      @subscriptions.delete subscription
    end
  end

  errors
end

#send_message(message, options = {}) ⇒ Object

Sends a custom message to the Lightstreamer server. Message sending can be done synchronously or asynchronously. By default the message will be sent synchronously, i.e. the message will be processed by the server and if an error occurs a LightstreamerError subclass will be raised immediately. However, if the :async option is true then the message will be sent asynchronously, and the result of the message send will be reported to all callbacks that have been registered via #on_message_result. If :async is set to true then the :sequence and :number options must also be specified.

Parameters:

  • message (String)

    The message to send to the Lightstreamer server.

  • options (Hash) (defaults to: {})

    The options that control messages sent asynchronously.

Options Hash (options):

  • :async (Boolean)

    Whether to send the message asynchronously. Defaults to false.

  • :sequence (String)

    The alphanumeric identifier that identifies a subset of messages that are to be processed in sequence based on the :number given to them. If the special "UNORDERED_MESSAGES" sequence is used then the associated messages are processed immediately, possibly concurrently, with no ordering constraint.

  • :number (Fixnum)

    The progressive number of this message within its sequence. Should start at 1.

  • :max_wait (Float)

    The maximum time the server can wait before processing this message if one or more of the preceding messages in the same sequence have not been received. If not specified then a timeout is assigned by the server.



263
264
265
266
267
268
269
270
271
272
# File 'lib/lightstreamer/session.rb', line 263

def send_message(message, options = {})
  url = URI.join(@stream_connection.control_address, '/lightstreamer/send_message.txt').to_s

  query = { LS_session: session_id, LS_message: message }
  query[:LS_sequence] = options.fetch(:sequence) if options[:async]
  query[:LS_msg_prog] = options.fetch(:number) if options[:async]
  query[:LS_max_wait] = options[:max_wait] if options[:max_wait]

  PostRequest.execute url, query
end

#session_idString?

Returns the ID of the currently active Lightstreamer session, or nil if there is no active session.

Returns:

  • (String, nil)


91
92
93
# File 'lib/lightstreamer/session.rb', line 91

def session_id
  @stream_connection&.session_id
end

#start_subscriptions(subscriptions, options = {}) ⇒ Array<LightstreamerError, nil>

This method performs Lightstreamer::Subscription#start on all the passed subscriptions. Calling Lightstreamer::Subscription#start on each subscription individually would also work but requires a separate POST request to be sent for every subscription, whereas this method starts all of the passed subscriptions in a single POST request which is significantly faster for a large number of subscriptions. The return value is an array with one entry per subscription and indicates the error state returned by the server for that subscription's start request, or nil if no error occurred.

Parameters:

  • subscriptions (Array<Subscription>)

    The subscriptions to start.

  • options (Hash) (defaults to: {})

    The options to start the subscriptions with. See Lightstreamer::Subscription#start for details on the supported options.

Returns:



188
189
190
191
192
# File 'lib/lightstreamer/session.rb', line 188

def start_subscriptions(subscriptions, options = {})
  details = subscriptions.map { |subscription| { subscription: subscription, action: :start, options: options } }

  perform_subscription_actions details
end

#stop_subscriptions(subscriptions) ⇒ Array<LightstreamerError, nil>

This method performs Lightstreamer::Subscription#stop on all the passed subscriptions. Calling Lightstreamer::Subscription#stop on each subscription individually would also work but requires a separate POST request to be sent for every subscription, whereas this method stops all of the passed subscriptions in a single POST request which is significantly faster for a large number of subscriptions. The return value is an array with one entry per subscription and indicates the error state returned by the server for that subscription's stop request, or nil if no error occurred.

Parameters:

  • subscriptions (Array<Subscription>)

    The subscriptions to stop.

Returns:



203
204
205
206
207
# File 'lib/lightstreamer/session.rb', line 203

def stop_subscriptions(subscriptions)
  details = subscriptions.map { |subscription| { subscription: subscription, action: :stop } }

  perform_subscription_actions details
end