Class: Lightstreamer::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/lightstreamer/subscription.rb

Overview

Describes a subscription that can be bound to a Lightstreamer session in order to consume its data. A subscription is described by the options passed to #initialize. Incoming data can be consumed by registering an asynchronous data callback using #add_data_callback, or by polling #retrieve_item_data. Subscriptions start receiving data only once they are attached to a Lightstreamer session using Lightstreamer::Session#subscribe.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Subscription

Initializes a new Lightstreamer subscription with the specified options. This can then be passed to Lightstreamer::Session#subscribe to activate the subscription on a Lightstreamer session.

Parameters:

  • options (Hash)

    The options to create the subscription with.

Options Hash (options):

  • :items (Array)

    The names of the items to subscribe to.

  • :fields (Array)

    The names of the fields to subscribe to on the items.

  • :mode (:distinct, :merge)

    The operation mode of this subscription.

  • :adapter (String)

    The name of the data adapter from the Lightstreamer session’s adapter set that should be used. If nil then the default data adapter will be used.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/lightstreamer/subscription.rb', line 33

def initialize(options)
  @id = self.class.next_id

  @items = Array(options.fetch(:items))
  @fields = Array(options.fetch(:fields))
  @mode = options.fetch(:mode).to_sym
  @adapter = options[:adapter]

  @data_mutex = Mutex.new
  clear_data

  @data_callbacks = []
end

Instance Attribute Details

#adapterString (readonly)

Returns The name of the data adapter from the Lightstreamer session’s adapter set that should be used. If nil then the default data adapter will be used.

Returns:

  • (String)

    The name of the data adapter from the Lightstreamer session’s adapter set that should be used. If nil then the default data adapter will be used.



22
23
24
# File 'lib/lightstreamer/subscription.rb', line 22

def adapter
  @adapter
end

#fieldsArray (readonly)

Returns The names of the fields to subscribe to on the items.

Returns:

  • (Array)

    The names of the fields to subscribe to on the items.



15
16
17
# File 'lib/lightstreamer/subscription.rb', line 15

def fields
  @fields
end

#idFixnum (readonly)

Returns The unique identification number of this subscription. This is used to identify the subscription in incoming Lightstreamer data.

Returns:

  • (Fixnum)

    The unique identification number of this subscription. This is used to identify the subscription in incoming Lightstreamer data.



9
10
11
# File 'lib/lightstreamer/subscription.rb', line 9

def id
  @id
end

#itemsArray (readonly)

Returns The names of the items to subscribe to.

Returns:

  • (Array)

    The names of the items to subscribe to.



12
13
14
# File 'lib/lightstreamer/subscription.rb', line 12

def items
  @items
end

#mode:distinct, :merge (readonly)

Returns The operation mode of this subscription.

Returns:

  • (:distinct, :merge)

    The operation mode of this subscription.



18
19
20
# File 'lib/lightstreamer/subscription.rb', line 18

def mode
  @mode
end

Class Method Details

.next_idFixnum

Returns the next unique ID to use for a new subscription.

Returns:

  • (Fixnum)


130
131
132
133
# File 'lib/lightstreamer/subscription.rb', line 130

def self.next_id
  @next_id ||= 0
  @next_id += 1
end

Instance Method Details

#add_data_callback(&block) ⇒ Proc

Adds the passed block to the list of callbacks that will be run when new data for this subscription arrives. The block will be called on a worker thread and so the code that is run by the block must be thread-safe. The arguments passed to the block are ‘|subscription, item_name, item_data, new_values|`.

Parameters:

  • block (Proc)

    The callback block that is to be run when new data arrives.

Returns:

  • (Proc)

    The same Proc object that was passed to this method. This can be used to remove this data callback at a later stage using #remove_data_callback.



76
77
78
79
80
# File 'lib/lightstreamer/subscription.rb', line 76

def add_data_callback(&block)
  @data_mutex.synchronize { @data_callbacks << block }

  block
end

#clear_dataObject

Clears all current data stored for this subscription. New data will continue to be processed as it becomes available.



49
50
51
52
53
# File 'lib/lightstreamer/subscription.rb', line 49

def clear_data
  @data_mutex.synchronize do
    @data = (0...items.size).map { { distinct: [], merge: {} }.fetch(mode) }
  end
end

#clear_data_for_item(item_name) ⇒ Object

Clears the current data stored for the specified item. This is important to do when #mode is :distinct as otherwise the incoming data will build up indefinitely.

Parameters:

  • item_name (String)

    The name of the item to clear the current data for.

Raises:

  • (ArgumentError)


59
60
61
62
63
64
65
66
# File 'lib/lightstreamer/subscription.rb', line 59

def clear_data_for_item(item_name)
  index = @items.index item_name
  raise ArgumentError, 'Unrecognized item name' unless index

  @data_mutex.synchronize do
    @data[index] = { distinct: [], merge: {} }.fetch(mode)
  end
end

#process_stream_data(line) ⇒ Boolean

Processes a line of stream data if it is relevant to this subscription. This method is thread-safe and is intended to be called by the session’s processing thread.

Parameters:

  • line (String)

    The line of stream data to process.

Returns:

  • (Boolean)

    Whether the passed line of stream data was relevant to this subscription and was successfully processed by it.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/lightstreamer/subscription.rb', line 111

def process_stream_data(line)
  item_index, new_values = parse_stream_data line
  return false unless item_index

  @data_mutex.synchronize do
    data = @data[item_index]

    data << new_values if mode == :distinct
    data.merge!(new_values) if mode == :merge

    call_data_callbacks @items[item_index], data, new_values
  end

  true
end

#remove_data_callback(block) ⇒ Object

Removes a data callback that was added by #add_data_callback.

Parameters:

  • block (Proc)

    The data callback block to remove.



85
86
87
# File 'lib/lightstreamer/subscription.rb', line 85

def remove_data_callback(block)
  @data_mutex.synchronize { @data_callbacks.delete block }
end

#retrieve_item_data(item_name) ⇒ Hash, Array

Returns the current data of one of this subscription’s items.

Parameters:

  • item_name (String)

    The name of the item to return the current data for.

Returns:

  • (Hash, Array)

    The item data. Will be a Hash if #mode is :merge, and an Array if #mode is :distinct.

Raises:

  • (ArgumentError)


95
96
97
98
99
100
101
102
# File 'lib/lightstreamer/subscription.rb', line 95

def retrieve_item_data(item_name)
  index = @items.index item_name
  raise ArgumentError, 'Unrecognized item name' unless index

  @data_mutex.synchronize do
    @data[index].dup
  end
end