Class: Metriks::ExponentiallyDecayingSample

Inherits:
Object
  • Object
show all
Defined in:
lib/metriks/exponentially_decaying_sample.rb

Constant Summary collapse

RESCALE_THRESHOLD =

1 hour

60 * 60

Instance Method Summary collapse

Constructor Details

#initialize(reservoir_size, alpha, values = nil) ⇒ ExponentiallyDecayingSample

Returns a new instance of ExponentiallyDecayingSample.



9
10
11
12
13
14
15
16
17
# File 'lib/metriks/exponentially_decaying_sample.rb', line 9

def initialize(reservoir_size, alpha, values = nil)
  @values = values || ConcurrentRedBlackTree.new
  @count = Atomic.new(0)
  @next_scale_time = Atomic.new(0)
  @alpha = alpha
  @reservoir_size = reservoir_size
  @mutex = Mutex.new
  clear
end

Instance Method Details

#clearObject



19
20
21
22
23
24
25
26
# File 'lib/metriks/exponentially_decaying_sample.rb', line 19

def clear
  @mutex.synchronize do
    @values.clear
    @count.value = 0
    @next_scale_time.value = Time.now + RESCALE_THRESHOLD
    @start_time = Time.now
  end
end

#rescale(now, next_time) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/metriks/exponentially_decaying_sample.rb', line 77

def rescale(now, next_time)
  if @next_scale_time.compare_and_swap(next_time, now + RESCALE_THRESHOLD)
    @mutex.synchronize do
      old_start_time = @start_time
      @start_time = Time.now
      @values.keys.each do |key|
        value = @values.delete(key)
        new_key = key * Math.exp(-@alpha * (@start_time - old_start_time))

        if key.nan?
          warn "ExponentiallyDecayingSample found a key of NaN. old_start_time: #{old_start_time.to_f} start_time: #{@start_time.to_f}"
          next
        end

        if new_key.nan?
          warn "ExponentiallyDecayingSample found a new_key of NaN. key: #{key} old_start_time: #{old_start_time.to_f} start_time: #{@start_time.to_f}"
          next
        end

        @values[new_key] = value
      end
    end
  end
end

#sizeObject



28
29
30
31
# File 'lib/metriks/exponentially_decaying_sample.rb', line 28

def size
  count = @count.value
  count < @reservoir_size ? count : @reservoir_size
end

#snapshotObject



33
34
35
36
37
# File 'lib/metriks/exponentially_decaying_sample.rb', line 33

def snapshot
  @mutex.synchronize do
    Snapshot.new(@values.values)
  end
end

#update(value, timestamp = Time.now) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/metriks/exponentially_decaying_sample.rb', line 39

def update(value, timestamp = Time.now)
  @mutex.synchronize do
    priority = weight(timestamp - @start_time) / rand
    priority = Float::MAX if priority.infinite?
    new_count = @count.update { |v| v + 1 }

    if priority.nan?
      warn "ExponentiallyDecayingSample found priority of NaN. timestamp: #{timestamp.to_f} start_time: #{@start_time.to_f}"
      return
    end

    if new_count <= @reservoir_size
      @values[priority] = value
    else
      first_priority = @values.first[0]
      if first_priority < priority
        unless @values[priority]
          @values[priority] = value

          until @values.delete(first_priority)
            first_priority = @values.first[0]
          end
        end
      end
    end
  end

  now = Time.new
  next_time = @next_scale_time.value
  if now >= next_time
    rescale(now, next_time)
  end
end

#weight(time) ⇒ Object



73
74
75
# File 'lib/metriks/exponentially_decaying_sample.rb', line 73

def weight(time)
  Math.exp(@alpha * time)
end