Class: Lightstreamer::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/subscription.rb

Overview

Describes a subscription that can be bound to a Session in order to consume its streaming data. A subscription is described by the options passed to #initialize. Incoming data can be consumed by registering an asynchronous data callback using #on_data or by polling using #item_data. Subscriptions start receiving data once they are attached to a session using Lightstreamer::Session#subscribe.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Subscription

Initializes a new Lightstreamer subscription with the specified options. This can then be passed to Lightstreamer::Session#subscribe to activate the subscription on a Lightstreamer session.

Parameters:

  • options (Hash)

    The options to create the subscription with.

Options Hash (options):

  • :items (Array)

    The names of the items to subscribe to. Required.

  • :fields (Array)

    The names of the fields to subscribe to on the items. Required.

  • :mode (:distinct, :merge)

    The operation mode of this subscription. Required.

  • :adapter (String)

    The name of the data adapter from the Lightstreamer session’s adapter set that should be used. If nil then the default data adapter will be used.

  • :selector (String)

    The selector for table items. Optional.

  • :maximum_update_frequency (Float, :unfiltered)

    The maximum number of updates this subscription should receive per second. Defaults to zero which means there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow).



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/lightstreamer/subscription.rb', line 59

def initialize(options)
  @id = self.class.next_id

  @items = options.fetch(:items)
  @fields = options.fetch(:fields)
  @mode = options.fetch(:mode).to_sym
  @adapter = options[:adapter]
  @selector = options[:selector]
  @maximum_update_frequency = options[:maximum_update_frequency] || 0.0

  @data_mutex = Mutex.new

  clear_data
  clear_callbacks
end

Instance Attribute Details

#adapterString? (readonly)

The name of the data adapter from the Lightstreamer session’s adapter set that should be used, or nil to use the default data adapter.

Returns:

  • (String, nil)


31
32
33
# File 'lib/lightstreamer/subscription.rb', line 31

def adapter
  @adapter
end

#fieldsArray (readonly)

The names of the fields to subscribe to on the items.

Returns:

  • (Array)


20
21
22
# File 'lib/lightstreamer/subscription.rb', line 20

def fields
  @fields
end

#idFixnum (readonly)

The unique identification number of this subscription.

Returns:

  • (Fixnum)


10
11
12
# File 'lib/lightstreamer/subscription.rb', line 10

def id
  @id
end

#itemsArray (readonly)

The names of the items to subscribe to.

Returns:

  • (Array)


15
16
17
# File 'lib/lightstreamer/subscription.rb', line 15

def items
  @items
end

#maximum_update_frequencyFloat, :unfiltered (readonly)

The maximum number of updates this subscription should receive per second. If this is set to zero, which is the default, then there is no limit on the update frequency. If set to :unfiltered then unfiltered streaming will be used for this subscription and it is possible for overflows to occur (see #on_overflow).

Returns:

  • (Float, :unfiltered)


43
44
45
# File 'lib/lightstreamer/subscription.rb', line 43

def maximum_update_frequency
  @maximum_update_frequency
end

#mode:distinct, :merge (readonly)

The operation mode of this subscription.

Returns:

  • (:distinct, :merge)


25
26
27
# File 'lib/lightstreamer/subscription.rb', line 25

def mode
  @mode
end

#selectorString? (readonly)

The selector for table items.

Returns:

  • (String, nil)


36
37
38
# File 'lib/lightstreamer/subscription.rb', line 36

def selector
  @selector
end

Class Method Details

.next_idFixnum

Returns the next unique subscription ID.

Returns:

  • (Fixnum)


143
144
145
146
# File 'lib/lightstreamer/subscription.rb', line 143

def self.next_id
  @next_id ||= 0
  @next_id += 1
end

Instance Method Details

#clear_callbacksObject

Removes all #on_data and #on_overflow callbacks present on this subscription.



105
106
107
108
109
# File 'lib/lightstreamer/subscription.rb', line 105

def clear_callbacks
  @data_mutex.synchronize do
    @callbacks = { on_data: [], on_overflow: [] }
  end
end

#clear_dataObject

Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.



77
78
79
# File 'lib/lightstreamer/subscription.rb', line 77

def clear_data
  @data = (0...items.size).map { {} }
end

#item_data(item_name) ⇒ Hash

Returns a copy of the current data of one of this subscription’s items.

Parameters:

  • item_name (String)

    The name of the item to return the current data for.

Returns:

  • (Hash)

    A copy of the item data.

Raises:

  • (ArgumentError)


116
117
118
119
120
121
122
123
# File 'lib/lightstreamer/subscription.rb', line 116

def item_data(item_name)
  index = @items.index item_name
  raise ArgumentError, 'Unknown item' unless index

  @data_mutex.synchronize do
    @data[index].dup
  end
end

#on_data(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_values|`. If #mode is :distinct then the values of item_data and new_values will be the same.

Parameters:

  • callback (Proc)

    The callback that is to be run when new data arrives.



87
88
89
90
91
# File 'lib/lightstreamer/subscription.rb', line 87

def on_data(&callback)
  @data_mutex.synchronize do
    @callbacks[:on_data] << callback
  end
end

#on_overflow(&callback) ⇒ Object

Adds the passed block to the list of callbacks that will be run when the server reports an overflow for this subscription. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, overflow_size|`.

Parameters:

  • callback (Proc)

    The callback that is to be run when an overflow is reported for this subscription.



98
99
100
101
102
# File 'lib/lightstreamer/subscription.rb', line 98

def on_overflow(&callback)
  @data_mutex.synchronize do
    @callbacks[:on_overflow] << callback
  end
end

#process_stream_data(line) ⇒ Boolean

Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.

Parameters:

  • line (String)

    The line of stream data to process.

Returns:

  • (Boolean)

    Whether the passed line of stream data was relevant to this subscription and was successfully processed by it.



134
135
136
# File 'lib/lightstreamer/subscription.rb', line 134

def process_stream_data(line)
  process_update_message(line) || process_overflow_message(line)
end