Class: NetworkResiliency::Stats

Inherits:
Object
  • Object
show all
Defined in:
lib/network_resiliency/stats.rb

Constant Summary collapse

MIN_SAMPLE_SIZE =
300
MAX_WINDOW_LENGTH =
1000
STATS_TTL =

1 day

24 * 60 * 60
CACHE_TTL =

seconds

120
LUA_SCRIPT =
"local results = {}\n\nfor i = 0, #KEYS / 2 - 1 do\n  local state_key = KEYS[i * 2 + 1]\n  local cache_key = KEYS[i * 2 + 2]\n\n  local n = tonumber(ARGV[i * 3 + 1])\n  local avg = ARGV[i * 3 + 2]\n  local sq_dist = math.floor(ARGV[i * 3 + 3])\n\n  if n > 0 then\n    -- save new data\n    local window_len = redis.call(\n      'LPUSH',\n      state_key,\n      string.format('%d|%f|%d', n, avg, sq_dist)\n    )\n    redis.call('EXPIRE', state_key, \#{STATS_TTL})\n\n    if window_len > \#{MAX_WINDOW_LENGTH} then\n      -- trim stats to window length\n      redis.call('LTRIM', state_key, 0, \#{MAX_WINDOW_LENGTH - 1})\n    end\n  end\n\n  -- retrieve aggregated stats\n\n  local cached_stats = redis.call('GET', cache_key)\n  if cached_stats then\n    -- use cached stats\n    n, avg, sq_dist = string.match(cached_stats, \"(%d+)|([%d.]+)|(%d+)\")\n    n = tonumber(n)\n  else\n    -- calculate aggregated stats\n    n = 0\n    avg = 0.0\n    sq_dist = 0\n\n    local stats = redis.call('LRANGE', state_key, 0, -1)\n    for _, entry in ipairs(stats) do\n      local other_n, other_avg, other_sq_dist = string.match(entry, \"(%d+)|([%d.]+)|(%d+)\")\n      other_n = tonumber(other_n)\n      other_avg = tonumber(other_avg) + 0.0\n      other_sq_dist = tonumber(other_sq_dist)\n\n      local prev_n = n\n      n = n + other_n\n\n      local delta = other_avg - avg\n      avg = avg + delta * other_n / n\n\n      sq_dist = sq_dist + other_sq_dist\n      sq_dist = sq_dist + (delta ^ 2) * prev_n * other_n / n\n    end\n\n    -- update cache\n    if n >= \#{MIN_SAMPLE_SIZE} then\n      cached_stats = string.format('%d|%f|%d', n, avg, sq_dist)\n      redis.call('SET', cache_key, cached_stats, 'EX', \#{CACHE_TTL})\n    end\n  end\n\n  -- accumulate results\n  table.insert(results, n)\n  table.insert(results, tostring(avg))\n  table.insert(results, sq_dist)\nend\n\nreturn results\n"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(values = []) ⇒ Stats

Returns a new instance of Stats.



29
30
31
32
33
34
# File 'lib/network_resiliency/stats.rb', line 29

def initialize(values = [])
  @lock = Thread::Mutex.new
  reset

  values.each {|x| update(x) }
end

Instance Attribute Details

#avgObject (readonly)

Returns the value of attribute avg.



3
4
5
# File 'lib/network_resiliency/stats.rb', line 3

def avg
  @avg
end

#nObject (readonly)

Returns the value of attribute n.



3
4
5
# File 'lib/network_resiliency/stats.rb', line 3

def n
  @n
end

Class Method Details

.fetch(redis, keys) ⇒ Object



207
208
209
210
211
212
# File 'lib/network_resiliency/stats.rb', line 207

def self.fetch(redis, keys)
  data = Array(keys).map { |k| [ k, new ] }.to_h
  res = sync(redis, **data)

  keys.is_a?(Array) ? res : res[keys]
end

.from(n:, avg:, sq_dist:) ⇒ Object



6
7
8
9
10
11
12
13
14
# File 'lib/network_resiliency/stats.rb', line 6

def from(n:, avg:, sq_dist:)
  new.tap do |instance|
    instance.instance_eval do
      @n = n.to_i
      @avg = avg.to_f
      @sq_dist = sq_dist.to_f
    end
  end
end

.sync(redis, **data) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/network_resiliency/stats.rb', line 188

def self.sync(redis, **data)
  keys = []
  args = []

  data.each do |key, stats|
    keys += [
      "network_resiliency:stats:#{key}",
      "network_resiliency:stats:cache:#{key}",
    ]

    args += [ stats.n, stats.avg, stats.send(:sq_dist) ]
  end

  res = redis.eval(LUA_SCRIPT, keys, args)
  data.keys.zip(res.each_slice(3)).to_h.transform_values! do |n, avg, sq_dist|
    Stats.from(n: n, avg: avg, sq_dist: sq_dist)
  end
end

Instance Method Details

#<<(value) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/network_resiliency/stats.rb', line 36

def <<(value)
  case value
  when Array
    value.each {|x| update(x) }
  when self.class
    merge!(value)
  else
    update(value)
  end

  self
end

#==(other) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/network_resiliency/stats.rb', line 93

def ==(other)
  return false unless other.is_a?(self.class)

  @n == other.n &&
    @avg == other.avg &&
    @sq_dist == other.sq_dist
end

#merge(other) ⇒ Object Also known as: +



57
58
59
# File 'lib/network_resiliency/stats.rb', line 57

def merge(other)
  dup.merge!(other)
end

#stdevObject



53
54
55
# File 'lib/network_resiliency/stats.rb', line 53

def stdev
  Math.sqrt(variance)
end

#sync(redis, key) ⇒ Object



184
185
186
# File 'lib/network_resiliency/stats.rb', line 184

def sync(redis, key)
  self.class.sync(redis, key => self)[key]
end

#to_sObject



214
215
216
# File 'lib/network_resiliency/stats.rb', line 214

def to_s
  "#<#{self.class.name}:#{object_id} n=#{n} avg=#{avg} sq_dist=#{sq_dist}>"
end

#variance(sample: false) ⇒ Object



49
50
51
# File 'lib/network_resiliency/stats.rb', line 49

def variance(sample: false)
  @n == 0 ? 0 : @sq_dist / (sample ? (@n - 1) : @n)
end