Class: FFWD::Processor::RateProcessor

Inherits:
Object
  • Object
show all
Includes:
Logging, FFWD::Processor, Reporter
Defined in:
lib/ffwd/processor/rate.rb

Overview

Implements rate statistics (similar to derive in collectd).

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Reporter

included, #increment, map_meta, #report!, #reporter_data

Methods included from FFWD::Processor

category, included, load_discovered, load_processors, #name, registry

Methods included from Logging

included, #log

Methods included from Lifecycle

#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks

Constructor Details

#initialize(emitter, config = {}) ⇒ RateProcessor

Returns a new instance of RateProcessor.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ffwd/processor/rate.rb', line 55

def initialize emitter, config={}
  @emitter = emitter

  @precision = config[:precision]
  @limit = config[:cache_limit]
  @min_age = config[:min_age]
  @ttl = config[:ttl]

  # keep a reference to the expire cache to prevent having to allocate it
  # all the time.
  @expire = Hash.new
  # Cache of active events.
  @cache = Hash.new

  starting do
    @timer = EM.add_periodic_timer(@ttl){expire!} unless @ttl.nil?
    log.info "Started"
    log.info "  config: #{config.inspect}"
  end

  stopping do
    log.info "Stopping rate processor"
    @timer.cancel if @timer
  end
end

Class Method Details

.prepare(config = {}) ⇒ Object

Options:

:precision - The precision to round to for emitted values. :cache_limit - Maxiumum amount of items allowed in the cache. :min_age - Minimum age that an item has to have in the cache to calculate rates. This exists to prevent too frequent updates to the cache which would yield very static or jumpy rates. Can be set to null to disable. :ttl - Allowed age of items in cache in seconds. If this is nil, items will never expire, so old elements will not be expunged until data type is restarted.



47
48
49
50
51
52
53
# File 'lib/ffwd/processor/rate.rb', line 47

def self.prepare config={}
  config[:precision] ||= 3
  config[:cache_limit] ||= 10000
  config[:min_age] ||= 0.5
  config[:ttl] ||= 600
  config
end

Instance Method Details

#expire!Object

Expire cached events that are too old.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/ffwd/processor/rate.rb', line 82

def expire!
  now = Time.new

  @cache.each do |key, value|
    diff = (now - value[:time])
    next if diff < @ttl
    @expire[key] = value
  end

  unless @expire.empty?
    increment :expired, @cache.size - @expire.size
    @cache = @expire
    @expire = Hash.new
  end
end

#process(msg) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/ffwd/processor/rate.rb', line 98

def process msg
  key = msg[:key]
  time = msg[:time]
  value = msg[:value] || 0

  unless (prev = @cache[key]).nil?
    prev_time = prev[:time]
    prev_value = prev[:value]

    diff = (time - prev_time)

    valid = @ttl.nil? or diff < @ttl
    aged = @min_age.nil? or diff > @min_age

    if diff > 0 and valid and aged
      rate = ((value - prev_value) / diff)
      rate = rate.round(@precision) unless @precision.nil?
      @emitter.metric.emit(
        :key => "#{key}.rate", :source => key, :value => rate)
    end
  else
    return increment :dropped if @cache.size >= @limit
  end

  increment :received
  @cache[key] = {:key => key, :time => time, :value => value}
end