Class: StreamingStats

Inherits:
Object
  • Object
show all
Defined in:
lib/streaming_stats.rb

Overview

Public: StreamingStats class StreamingStats is a Ruby class that takes streaming numeric data and return descriptive statistics with minimal overhead. A stream with n entries will only require about log2(n) storage. The main update function is ‘insert`, and the object can return

  • n (number of values inserted)

  • sum

  • mean

  • stddev

  • variance

  • quantile (i.e. percentile)

  • min

  • max

The sum, mean, stddev, variance functions are calculated more or less as in the technical description here: www.johndcook.com/blog/standard_deviation/

The quantile method is a Ruby port of github.com/sengelha/streaming-percentiles-js The variable names, etc. of the quantile method are adopted from that project

The compression size can be estimated with the method compression_size

require ‘streaming_stats’ > gk = StreamingStats.new(epsilon: 0.01); 10000.times rand

> 10000

> gk.n

> 10000

> gk.sum

> 4985.484627445102

> gk.mean

> 0.4985484627445139

> gk.stddev

> 0.288236161831176

> gk.variance

> 0.08308008498716787

> gk.min

> 0.0001414880872682156

> gk.max

> 0.9999396732975679

> gk.quantile 0.1

> 0.08869274826771956

> gk.quantile 0.5

> 0.4944707523857559

> gk.quantile 0.9

> 0.9004683944698589

> gk.quantile 0.999

> 0.9999396732975679

gk.compression_ratio

> 0.9927

Constant Summary collapse

GK_MAX_BAND =
999_999

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(epsilon: 0.1) ⇒ StreamingStats

epsilon - “epsilon is allowable error. As epsilon becomes smaller, the accuracy of the approximation improves, but the class consumes more memory” see www.stevenengelhardt.com/series/calculating-percentiles-on-streaming-data/



61
62
63
64
65
66
67
68
69
70
# File 'lib/streaming_stats.rb', line 61

def initialize(epsilon: 0.1)
  @n = 0
  @mean = 0.0
  @m2 = 0.0
  @sum = 0.0

  @epsilon = epsilon
  @one_over_2e = 1 / (2 * epsilon)
  @S = []
end

Instance Attribute Details

#epsilonObject (readonly)

Returns the value of attribute epsilon.



56
57
58
# File 'lib/streaming_stats.rb', line 56

def epsilon
  @epsilon
end

#meanObject (readonly)

Returns the value of attribute mean.



56
57
58
# File 'lib/streaming_stats.rb', line 56

def mean
  @mean
end

#nObject (readonly)

Returns the value of attribute n.



56
57
58
# File 'lib/streaming_stats.rb', line 56

def n
  @n
end

#sumObject (readonly)

Returns the value of attribute sum.



56
57
58
# File 'lib/streaming_stats.rb', line 56

def sum
  @sum
end

Class Method Details

._construct_band_lookup(two_epsilon_n) ⇒ Object

Private: Constructs a band lookup



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/streaming_stats.rb', line 217

def self._construct_band_lookup(two_epsilon_n)
  bands = Array.new(two_epsilon_n + 1)
  bands[0] = GK_MAX_BAND
  bands[two_epsilon_n] = 0 # when float?
  p = two_epsilon_n.floor
  (1..Math.log2(two_epsilon_n).ceil).each do |alpha|
    two_alpha_minus_1 = 2**(alpha - 1)
    two_alpha = 2**alpha
    lower = [p - two_alpha - (p % two_alpha), 0].max
    upper = p - two_alpha_minus_1 - (p % two_alpha_minus_1)
    ((lower + 1)..upper).each do |i|
      bands[i] = alpha
    end
  end
  bands
end

.splice!(array, start, len, *replacements) ⇒ Object

from stackoverflow.com/questions/6892551/array-prototype-splice-in-ruby Same as Javascript splice, but not put on Array prototype



260
261
262
263
264
# File 'lib/streaming_stats.rb', line 260

def self.splice!(array, start, len, *replacements)
  r = array.slice!(start, len)
  array[start, 0] = replacements if replacements
  r
end

Instance Method Details

#_compressObject

Private: Compresses the number of values stored



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/streaming_stats.rb', line 187

def _compress
  two_epsilon_n = 2 * @epsilon * @n
  bands = StreamingStats._construct_band_lookup(two_epsilon_n)
  # We must always keep the first and last nodes as these
  # are global min/max
  i = @S.length - 2
  while i >= 1
    if bands[@S[i].delta] <= bands[@S[i + 1].delta]
      start_indx = i
      g_i_star = @S[i].g
      while start_indx >= 2 && (bands[@S[start_indx - 1].delta] < bands[@S[i].delta])
        start_indx -= 1
        g_i_star += @S[start_indx].g
      end
      if (g_i_star + @S[i + 1].g + @S[i + 1].delta) < two_epsilon_n
        # The below is a delete_tuples([start_indx, i]) operation
        merged = OpenStruct.new(
          v: @S[i + 1].v,
          g: g_i_star + @S[i + 1].g,
          delta: @S[i + 1].delta
        )
        StreamingStats.splice!(@S, start_indx, 2 + (i - start_indx), merged)
        i = start_indx
      end
    end
    i -= 1
  end
end

#_determine_delta(i) ⇒ Object

Private: Determine delta



251
252
253
254
255
256
# File 'lib/streaming_stats.rb', line 251

def _determine_delta(i)
  return 0 if @n < @one_over_2e
  return 0 if i.zero? || i == @S.size

  (2 * @epsilon * @n).floor - 1
end

#_do_insert(v) ⇒ Object

Private: Actually does a new insertion into S



235
236
237
238
239
240
241
# File 'lib/streaming_stats.rb', line 235

def _do_insert(v)
  i = _find_insertion_index(v)
  delta = _determine_delta(i)
  tuple = OpenStruct.new(v: v, g: 1, delta: delta)
  StreamingStats.splice!(@S, i, 0, tuple)
  @S
end

#_find_insertion_index(value) ⇒ Object

Private: Find where to insert



244
245
246
247
248
# File 'lib/streaming_stats.rb', line 244

def _find_insertion_index(value)
  i = 0
  i += 1 while i < @S.size && value >= @S[i].v
  i
end

#compression_ratioObject

Public: Returns the compression ratio achieved

Examples

compression_ration
=> 99.1

Returns the compression ratio achieved



182
183
184
# File 'lib/streaming_stats.rb', line 182

def compression_ratio
  1.0 - (1.0 * @S.size / @n)
end

#insert(value) ⇒ Object

Public: inserts a value from a stream, updating the state

value - The Numeric to be inserted

Examples

insert(100)
=> 100

Returns the Numeric inserted



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/streaming_stats.rb', line 88

def insert(value)
  ## Basic stats accumulators
  @n += 1
  @sum += value
  delta = value - @mean
  @mean += (delta / @n)
  @m2 += (delta * (value - @mean))
  ## quantile work
  _compress if (@n % @one_over_2e).zero?
  _do_insert value
  value
end

#maxObject

Public: Returns the maximum value so far inserted

Examples

max
=> 500.0

Returns the maximum value



170
171
172
# File 'lib/streaming_stats.rb', line 170

def max
  @S.last.v
end

#minObject

Public: Returns the minimum value so far inserted

Examples

max
=> 500.0

Returns the minimum value



158
159
160
# File 'lib/streaming_stats.rb', line 158

def min
  @S[0].v
end

#quantile(phi) ⇒ Object

Public: Returns the approximate quantile (percentile) at phi

phi - A Numeric between 0.0 and 1.0, inclusive

Examples

quantile(0.5)
=> 5.01

Returns the approximate quantile



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/streaming_stats.rb', line 137

def quantile(phi)
  throw ArgumentError.new("#{phi} must be between 0.0 and 1.0 inclusive") unless phi.between?(0.0, 1.0)
  en = @epsilon * @n
  r = (phi * @n).ceil
  rmin = 0
  (0..@S.size - 1).each do |i|
    rmin += @S[i].g
    rmax = rmin + @S[i].delta
    return @S[i].v if r - rmin <= en && rmax - r <= en
  end
  throw 'Unknown error'
end

#sObject

Public: Returns the compression list For debugging only



74
75
76
# File 'lib/streaming_stats.rb', line 74

def s
  @S
end

#stddevObject

Public: Returns the standard deviation of the streamed data. Initialized to 0.0

Examples

stddev
=> 1.414

Returns the standard deviation



123
124
125
# File 'lib/streaming_stats.rb', line 123

def stddev
  Math.sqrt(variance)
end

#varianceObject

Public: Returns the variance of the streamed data. Initialized to 0.0

Examples

variance
=> 2.00

Returns the variance



109
110
111
112
113
# File 'lib/streaming_stats.rb', line 109

def variance
  return 0 if @n <= 1

  @m2 / @n
end