Class: SQA::Stream
- Inherits:
-
Object
- Object
- SQA::Stream
- Defined in:
- lib/sqa/stream.rb
Instance Attribute Summary collapse
-
#data_buffer ⇒ Object
readonly
Returns the value of attribute data_buffer.
-
#indicator_cache ⇒ Object
readonly
Returns the value of attribute indicator_cache.
-
#strategies ⇒ Object
readonly
Returns the value of attribute strategies.
-
#ticker ⇒ Object
readonly
Returns the value of attribute ticker.
-
#window_size ⇒ Object
readonly
Returns the value of attribute window_size.
Instance Method Summary collapse
-
#add_strategy(strategy) ⇒ Object
Add a strategy to the stream processor.
-
#current_price ⇒ Object
Get current price.
-
#current_signal ⇒ Object
Get current trading signal from last strategy execution.
-
#indicator(name, **options) ⇒ Object
Calculate or retrieve cached indicator.
-
#initialize(ticker:, window_size: 100, strategies: []) ⇒ Stream
constructor
A new instance of Stream.
-
#on_signal(&block) ⇒ Object
Register callback for trading signals.
-
#on_update(&block) ⇒ Object
Register callback for price updates.
-
#recent_prices(count = nil) ⇒ Object
Get recent prices.
-
#recent_volumes(count = nil) ⇒ Object
Get recent volumes.
-
#reset ⇒ Object
Reset the stream (clear all data).
-
#stats ⇒ Object
Get statistics about the stream.
-
#update(price:, volume: nil, high: nil, low: nil, timestamp: Time.now) ⇒ Object
Update stream with new market data.
Constructor Details
#initialize(ticker:, window_size: 100, strategies: []) ⇒ Stream
Returns a new instance of Stream.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/sqa/stream.rb', line 42 def initialize(ticker:, window_size: 100, strategies: []) @ticker = ticker @window_size = window_size @strategies = Array(strategies) @signal_callbacks = [] @update_callbacks = [] # Thread-safe data structures @mutex = Mutex.new @data_buffer = { prices: [], volumes: [], highs: [], lows: [], timestamps: [] } @indicator_cache = {} @last_signal = :hold @update_count = 0 end |
Instance Attribute Details
#data_buffer ⇒ Object (readonly)
Returns the value of attribute data_buffer.
40 41 42 |
# File 'lib/sqa/stream.rb', line 40 def data_buffer @data_buffer end |
#indicator_cache ⇒ Object (readonly)
Returns the value of attribute indicator_cache.
40 41 42 |
# File 'lib/sqa/stream.rb', line 40 def indicator_cache @indicator_cache end |
#strategies ⇒ Object (readonly)
Returns the value of attribute strategies.
40 41 42 |
# File 'lib/sqa/stream.rb', line 40 def strategies @strategies end |
#ticker ⇒ Object (readonly)
Returns the value of attribute ticker.
40 41 42 |
# File 'lib/sqa/stream.rb', line 40 def ticker @ticker end |
#window_size ⇒ Object (readonly)
Returns the value of attribute window_size.
40 41 42 |
# File 'lib/sqa/stream.rb', line 40 def window_size @window_size end |
Instance Method Details
#add_strategy(strategy) ⇒ Object
Add a strategy to the stream processor
65 66 67 68 69 70 |
# File 'lib/sqa/stream.rb', line 65 def add_strategy(strategy) @mutex.synchronize do @strategies << strategy end self end |
#current_price ⇒ Object
Get current price
136 137 138 |
# File 'lib/sqa/stream.rb', line 136 def current_price @mutex.synchronize { @data_buffer[:prices].last } end |
#current_signal ⇒ Object
Get current trading signal from last strategy execution
177 178 179 |
# File 'lib/sqa/stream.rb', line 177 def current_signal @last_signal end |
#indicator(name, **options) ⇒ Object
Calculate or retrieve cached indicator
Example:
rsi = stream.indicator(:rsi, period: 14)
sma = stream.indicator(:sma, period: 20)
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/sqa/stream.rb', line 159 def indicator(name, **) cache_key = [name, ].hash @mutex.synchronize do return @indicator_cache[cache_key] if @indicator_cache.key?(cache_key) prices = @data_buffer[:prices].dup volumes = @data_buffer[:volumes].dup highs = @data_buffer[:highs].dup lows = @data_buffer[:lows].dup result = calculate_indicator(name, prices, volumes, highs, lows, **) @indicator_cache[cache_key] = result result end end |
#on_signal(&block) ⇒ Object
Register callback for trading signals
Example:
stream.on_signal do |signal, data|
puts "#{signal.upcase} at $#{data[:price]}"
end
78 79 80 81 |
# File 'lib/sqa/stream.rb', line 78 def on_signal(&block) @signal_callbacks << block self end |
#on_update(&block) ⇒ Object
Register callback for price updates
Example:
stream.on_update do |data|
puts "Price updated: $#{data[:price]}"
end
89 90 91 92 |
# File 'lib/sqa/stream.rb', line 89 def on_update(&block) @update_callbacks << block self end |
#recent_prices(count = nil) ⇒ Object
Get recent prices
141 142 143 144 145 |
# File 'lib/sqa/stream.rb', line 141 def recent_prices(count = nil) @mutex.synchronize do count ? @data_buffer[:prices].last(count) : @data_buffer[:prices].dup end end |
#recent_volumes(count = nil) ⇒ Object
Get recent volumes
148 149 150 151 152 |
# File 'lib/sqa/stream.rb', line 148 def recent_volumes(count = nil) @mutex.synchronize do count ? @data_buffer[:volumes].last(count) : @data_buffer[:volumes].dup end end |
#reset ⇒ Object
Reset the stream (clear all data)
199 200 201 202 203 204 205 206 207 |
# File 'lib/sqa/stream.rb', line 199 def reset @mutex.synchronize do @data_buffer.each { |_, v| v.clear } @indicator_cache.clear @last_signal = :hold @update_count = 0 end self end |
#stats ⇒ Object
Get statistics about the stream
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/sqa/stream.rb', line 182 def stats @mutex.synchronize do { ticker: @ticker, updates: @update_count, buffer_size: @data_buffer[:prices].size, window_size: @window_size, current_price: @data_buffer[:prices].last, price_range: price_range, last_signal: @last_signal, strategies: @strategies.size, callbacks: @signal_callbacks.size } end end |
#update(price:, volume: nil, high: nil, low: nil, timestamp: Time.now) ⇒ Object
Update stream with new market data
Required fields: price Optional fields: volume, high, low, timestamp
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/sqa/stream.rb', line 98 def update(price:, volume: nil, high: nil, low: nil, timestamp: Time.now) @mutex.synchronize do # Add to buffers @data_buffer[:prices] << price.to_f @data_buffer[:volumes] << (volume || 0).to_f @data_buffer[:highs] << (high || price).to_f @data_buffer[:lows] << (low || price).to_f @data_buffer[:timestamps] << # Trim buffers to window size trim_buffers # Clear indicator cache (will be recalculated on demand) @indicator_cache.clear @update_count += 1 end # Prepare update data update_data = { price: price, volume: volume, high: high, low: low, timestamp: , count: @update_count } # Notify update callbacks @update_callbacks.each { |callback| callback.call(update_data) } # Process strategies if we have enough data process_strategies if sufficient_data? true end |