Class: SQA::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/sqa/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ticker:, window_size: 100, strategies: []) ⇒ Stream

Returns a new instance of Stream.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sqa/stream.rb', line 42

def initialize(ticker:, window_size: 100, strategies: [])
  @ticker = ticker
  @window_size = window_size
  @strategies = Array(strategies)
  @signal_callbacks = []
  @update_callbacks = []

  # Thread-safe data structures
  @mutex = Mutex.new
  @data_buffer = {
    prices: [],
    volumes: [],
    highs: [],
    lows: [],
    timestamps: []
  }

  @indicator_cache = {}
  @last_signal = :hold
  @update_count = 0
end

Instance Attribute Details

#data_bufferObject (readonly)

Returns the value of attribute data_buffer.



40
41
42
# File 'lib/sqa/stream.rb', line 40

def data_buffer
  @data_buffer
end

#indicator_cacheObject (readonly)

Returns the value of attribute indicator_cache.



40
41
42
# File 'lib/sqa/stream.rb', line 40

def indicator_cache
  @indicator_cache
end

#strategiesObject (readonly)

Returns the value of attribute strategies.



40
41
42
# File 'lib/sqa/stream.rb', line 40

def strategies
  @strategies
end

#tickerObject (readonly)

Returns the value of attribute ticker.



40
41
42
# File 'lib/sqa/stream.rb', line 40

def ticker
  @ticker
end

#window_sizeObject (readonly)

Returns the value of attribute window_size.



40
41
42
# File 'lib/sqa/stream.rb', line 40

def window_size
  @window_size
end

Instance Method Details

#add_strategy(strategy) ⇒ Object

Add a strategy to the stream processor



65
66
67
68
69
70
# File 'lib/sqa/stream.rb', line 65

def add_strategy(strategy)
  @mutex.synchronize do
    @strategies << strategy
  end
  self
end

#current_priceObject

Get current price



136
137
138
# File 'lib/sqa/stream.rb', line 136

def current_price
  @mutex.synchronize { @data_buffer[:prices].last }
end

#current_signalObject

Get current trading signal from last strategy execution



177
178
179
# File 'lib/sqa/stream.rb', line 177

def current_signal
  @last_signal
end

#indicator(name, **options) ⇒ Object

Calculate or retrieve cached indicator

Example:

rsi = stream.indicator(:rsi, period: 14)
sma = stream.indicator(:sma, period: 20)


159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/sqa/stream.rb', line 159

def indicator(name, **options)
  cache_key = [name, options].hash

  @mutex.synchronize do
    return @indicator_cache[cache_key] if @indicator_cache.key?(cache_key)

    prices = @data_buffer[:prices].dup
    volumes = @data_buffer[:volumes].dup
    highs = @data_buffer[:highs].dup
    lows = @data_buffer[:lows].dup

    result = calculate_indicator(name, prices, volumes, highs, lows, **options)
    @indicator_cache[cache_key] = result
    result
  end
end

#on_signal(&block) ⇒ Object

Register callback for trading signals

Example:

stream.on_signal do |signal, data|
  puts "#{signal.upcase} at $#{data[:price]}"
end


78
79
80
81
# File 'lib/sqa/stream.rb', line 78

def on_signal(&block)
  @signal_callbacks << block
  self
end

#on_update(&block) ⇒ Object

Register callback for price updates

Example:

stream.on_update do |data|
  puts "Price updated: $#{data[:price]}"
end


89
90
91
92
# File 'lib/sqa/stream.rb', line 89

def on_update(&block)
  @update_callbacks << block
  self
end

#recent_prices(count = nil) ⇒ Object

Get recent prices



141
142
143
144
145
# File 'lib/sqa/stream.rb', line 141

def recent_prices(count = nil)
  @mutex.synchronize do
    count ? @data_buffer[:prices].last(count) : @data_buffer[:prices].dup
  end
end

#recent_volumes(count = nil) ⇒ Object

Get recent volumes



148
149
150
151
152
# File 'lib/sqa/stream.rb', line 148

def recent_volumes(count = nil)
  @mutex.synchronize do
    count ? @data_buffer[:volumes].last(count) : @data_buffer[:volumes].dup
  end
end

#resetObject

Reset the stream (clear all data)



199
200
201
202
203
204
205
206
207
# File 'lib/sqa/stream.rb', line 199

def reset
  @mutex.synchronize do
    @data_buffer.each { |_, v| v.clear }
    @indicator_cache.clear
    @last_signal = :hold
    @update_count = 0
  end
  self
end

#statsObject

Get statistics about the stream



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/sqa/stream.rb', line 182

def stats
  @mutex.synchronize do
    {
      ticker: @ticker,
      updates: @update_count,
      buffer_size: @data_buffer[:prices].size,
      window_size: @window_size,
      current_price: @data_buffer[:prices].last,
      price_range: price_range,
      last_signal: @last_signal,
      strategies: @strategies.size,
      callbacks: @signal_callbacks.size
    }
  end
end

#update(price:, volume: nil, high: nil, low: nil, timestamp: Time.now) ⇒ Object

Update stream with new market data

Required fields: price Optional fields: volume, high, low, timestamp



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/sqa/stream.rb', line 98

def update(price:, volume: nil, high: nil, low: nil, timestamp: Time.now)
  @mutex.synchronize do
    # Add to buffers
    @data_buffer[:prices] << price.to_f
    @data_buffer[:volumes] << (volume || 0).to_f
    @data_buffer[:highs] << (high || price).to_f
    @data_buffer[:lows] << (low || price).to_f
    @data_buffer[:timestamps] << timestamp

    # Trim buffers to window size
    trim_buffers

    # Clear indicator cache (will be recalculated on demand)
    @indicator_cache.clear

    @update_count += 1
  end

  # Prepare update data
  update_data = {
    price: price,
    volume: volume,
    high: high,
    low: low,
    timestamp: timestamp,
    count: @update_count
  }

  # Notify update callbacks
  @update_callbacks.each { |callback| callback.call(update_data) }

  # Process strategies if we have enough data
  process_strategies if sufficient_data?

  true
end