Class: Lightstreamer::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/session.rb

Overview

This class is responsible for managing a Lightstreamer session, and along with the Subscription class forms the primary API for working with Lightstreamer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Session

Initializes this new Lightstreamer session with the passed options.

Parameters:

  • options (Hash) (defaults to: {})

    The options to create the session with.

Options Hash (options):

  • :server_url (String)

    The URL of the Lightstreamer server. Required.

  • :username (String)

    The username to connect to the server with.

  • :password (String)

    The password to connect to the server with.

  • :adapter_set (String)

    The name of the adapter set to request from the server.

  • :requested_maximum_bandwidth. (Float)

    The server-side bandwidth constraint on data usage, expressed in kbps. Defaults to zero which means no limit is applied.



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lightstreamer/session.rb', line 46

def initialize(options = {})
  @subscriptions = []
  @subscriptions_mutex = Mutex.new

  @server_url = options.fetch :server_url
  @username = options[:username]
  @password = options[:password]
  @adapter_set = options[:adapter_set]
  @requested_maximum_bandwidth = options[:requested_maximum_bandwidth].to_f

  @on_message_result_callbacks = []
end

Instance Attribute Details

#adapter_setString? (readonly)

The name of the adapter set to request from the Lightstreamer server. Set by #initialize.

Returns:

  • (String, nil)


23
24
25
# File 'lib/lightstreamer/session.rb', line 23

def adapter_set
  @adapter_set
end

#errorLightstreamerError? (readonly)

If an error occurs on the stream connection that causes the session to terminate then details of the error will be stored in this attribute. If the session is terminated as a result of calling #disconnect then the error will be Errors::SessionEndError.

Returns:



30
31
32
# File 'lib/lightstreamer/session.rb', line 30

def error
  @error
end

#passwordString? (readonly)

The password to connect to the Lightstreamer server with. Set by #initialize.

Returns:

  • (String, nil)


18
19
20
# File 'lib/lightstreamer/session.rb', line 18

def password
  @password
end

#requested_maximum_bandwidthFloat

The server-side bandwidth constraint on data usage, expressed in kbps. If this is zero then no limit is applied.

Returns:

  • (Float)


35
36
37
# File 'lib/lightstreamer/session.rb', line 35

def requested_maximum_bandwidth
  @requested_maximum_bandwidth
end

#server_urlString (readonly)

The URL of the Lightstreamer server to connect to. Set by #initialize.

Returns:

  • (String)


8
9
10
# File 'lib/lightstreamer/session.rb', line 8

def server_url
  @server_url
end

#usernameString? (readonly)

The username to connect to the Lightstreamer server with. Set by #initialize.

Returns:

  • (String, nil)


13
14
15
# File 'lib/lightstreamer/session.rb', line 13

def username
  @username
end

Instance Method Details

#build_subscription(options) ⇒ Subscription

Builds a new subscription for this session with the specified options. Note that ths does not activate the subscription, Lightstreamer::Subscription#start must be called to actually start streaming the subscription’s data. See the Lightstreamer::Subscription class for more details.

Parameters:

  • options (Hash)

    The options to create the subscription with.

Options Hash (options):

  • :items (Array)

    The names of the items to subscribe to. Required.

  • :fields (Array)

    The names of the fields to subscribe to on the items. Required.

  • :mode (:distinct, :merge)

    The operation mode of the subscription. Required.

  • :adapter (String)

    The name of the data adapter from this session’s adapter set that should be used. If nil then the default data adapter will be used.

  • :selector (String)

    The selector for table items. Optional.

  • :maximum_update_frequency (Float, :unfiltered)

    The maximum number of updates the subscription should receive per second. Defaults to zero which means there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for the subscription and it is possible for overflows to occur (see Lightstreamer::Subscription#on_overflow).

Returns:



132
133
134
135
136
137
138
# File 'lib/lightstreamer/session.rb', line 132

def build_subscription(options)
  subscription = Subscription.new self, options

  @subscriptions_mutex.synchronize { @subscriptions << subscription }

  subscription
end

#bulk_subscription_start(*subscriptions) ⇒ Array<LightstreamerError, nil>

This method performs a bulk Lightstreamer::Subscription#start on all the passed subscriptions. Calling Lightstreamer::Subscription#start on each of them individually would also work but requires a separate POST request to be sent for every subscription. This request starts all of the passed subscriptions in a single POST request which is significantly faster for a large number of subscriptions. The return value is an array with one entry per subscription and indicates the error state returned by the server for that subscription’s start request, or nil if no error occurred.

Parameters:

  • subscriptions (Array<Subscription>)

    The subscriptions to start.

Returns:



158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/lightstreamer/session.rb', line 158

def bulk_subscription_start(*subscriptions)
  request_bodies = subscriptions.map do |subscription|
    PostRequest.request_body session_id, *subscription.start_control_request_args
  end

  errors = PostRequest.bulk_execute @stream_connection.control_address, request_bodies

  # Set @active to true on all subscriptions that did not have an error
  errors.each_with_index do |error, index|
    subscriptions[index].instance_variable_set :@active, true if error.nil?
  end
end

#connectObject

Connects a new Lightstreamer session using the details passed to #initialize. If an error occurs then a LightstreamerError subclass will be raised.



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/lightstreamer/session.rb', line 61

def connect
  return if @stream_connection

  @error = nil

  @stream_connection = StreamConnection.new self
  @stream_connection.connect

  create_processing_thread
rescue
  @stream_connection = nil
  raise
end

#connected?Boolean

Returns whether this Lightstreamer session is currently connected and has an active stream connection.

Returns:

  • (Boolean)


78
79
80
# File 'lib/lightstreamer/session.rb', line 78

def connected?
  !@stream_connection.nil?
end

#control_request(operation, options = {}) ⇒ Object

Sends a request to the control connection. If an error occurs then a LightstreamerError subclass will be raised.

Parameters:

  • operation (Symbol)

    The control operation to perform.

  • options (Hash) (defaults to: {})

    The options to send with the control request.



221
222
223
224
225
# File 'lib/lightstreamer/session.rb', line 221

def control_request(operation, options = {})
  url = URI.join(@stream_connection.control_address, '/lightstreamer/control.txt').to_s

  PostRequest.execute url, options.merge(LS_session: session_id, LS_op: operation)
end

#disconnectObject

Disconnects this Lightstreamer session and terminates the session on the server. All worker threads are exited.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/lightstreamer/session.rb', line 90

def disconnect
  control_request :destroy if @stream_connection

  @processing_thread.join 5 if @processing_thread
ensure
  @stream_connection.disconnect if @stream_connection
  @processing_thread.exit if @processing_thread

  @subscriptions.each do |subscription|
    subscription.instance_variable_set :@active, false
  end

  @processing_thread = @stream_connection = nil
end

#force_rebindObject

Requests that the Lightstreamer server terminate the currently active stream connection and require that a new stream connection be initiated by the client. The Lightstreamer server requires closure and re-establishment of the stream connection periodically during normal operation, this method just allows such a reconnection to be requested explicitly by the client. If an error occurs then a LightstreamerError subclass will be raised.



109
110
111
112
113
# File 'lib/lightstreamer/session.rb', line 109

def force_rebind
  return unless @stream_connection

  control_request :force_rebind
end

#on_message_result(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the outcome of an asynchronous message send arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|sequence, numbers, error|`.

Parameters:

  • callback (Proc)

    The callback that is to be run.



213
214
215
# File 'lib/lightstreamer/session.rb', line 213

def on_message_result(&callback)
  @on_message_result_callbacks << callback
end

#remove_subscription(subscription) ⇒ Object

Stops the specified subscription and removes it from this session. If an error occurs then a LightstreamerError subclass will be raised. To just stop a subscription with the option of restarting it at a later date call Lightstreamer::Subscription#stop on the subscription itself.



143
144
145
146
147
# File 'lib/lightstreamer/session.rb', line 143

def remove_subscription(subscription)
  subscription.stop

  @subscriptions_mutex.synchronize { @subscriptions.delete subscription }
end

#send_message(message, options = {}) ⇒ Object

Sends a custom message to the Lightstreamer server. Message sending can be done synchronously or asynchronously. By default the message will be sent synchronously, i.e. the message will be processed by the server and if an error occurs a LightstreamerError subclass will be raised immediately. However, if the :async option is true then the message will be sent asynchronously, and the result of the message send will be reported to all callbacks that have been registered via #on_message_result.

Parameters:

  • message (String)

    The message to send to the Lightstreamer server.

  • options (Hash) (defaults to: {})

    The options that control messages sent asynchronously.

Options Hash (options):

  • :async (Boolean)

    Whether to send the message asynchronously. Defaults to false.

  • :sequence (String)

    The alphanumeric identifier that identifies a subset of messages that are to be processed in sequence based on the :number given to them. If the special UNORDERED_MESSAGES sequence is used then the associated messages are processed immediately, possibly concurrently, with no ordering constraint.

  • :number (Fixnum)

    The progressive number of this message within its sequence. Should start at 1.

  • :max_wait (Float)

    The maximum time the server can wait before processing this message if one or more of the preceding messages in the same sequence have not been received. If not specified then a timeout is assigned by the server.



197
198
199
200
201
202
203
204
205
206
# File 'lib/lightstreamer/session.rb', line 197

def send_message(message, options = {})
  url = URI.join(@stream_connection.control_address, '/lightstreamer/send_message.txt').to_s

  query = { LS_session: session_id, LS_message: message }
  query[:LS_sequence] = options.fetch(:sequence) if options[:async]
  query[:LS_msg_prog] = options.fetch(:number) if options[:async]
  query[:LS_max_wait] = options[:max_wait] if options[:max_wait]

  PostRequest.execute url, query
end

#session_idString?

Returns the ID of the currently active Lightstreamer session, or nil if there is no active session.

Returns:

  • (String, nil)


85
86
87
# File 'lib/lightstreamer/session.rb', line 85

def session_id
  @stream_connection && @stream_connection.session_id
end