Class: FFWD::Processor::RateProcessor
- Inherits:
-
Object
- Object
- FFWD::Processor::RateProcessor
- Includes:
- Logging, FFWD::Processor, Reporter
- Defined in:
- lib/ffwd/processor/rate.rb
Overview
Implements rate statistics (similar to derive in collectd).
Class Method Summary collapse
-
.prepare(config = {}) ⇒ Object
Options:.
Instance Method Summary collapse
-
#expire! ⇒ Object
Expire cached events that are too old.
-
#initialize(emitter, config = {}) ⇒ RateProcessor
constructor
A new instance of RateProcessor.
- #process(msg) ⇒ Object
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
Methods included from FFWD::Processor
category, included, load_discovered, load_processors, #name, registry
Methods included from Logging
Methods included from Lifecycle
#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks
Constructor Details
#initialize(emitter, config = {}) ⇒ RateProcessor
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ffwd/processor/rate.rb', line 55 def initialize emitter, config={} @emitter = emitter @precision = config[:precision] @limit = config[:cache_limit] @min_age = config[:min_age] @ttl = config[:ttl] # keep a reference to the expire cache to prevent having to allocate it # all the time. @expire = Hash.new # Cache of active events. @cache = Hash.new starting do @timer = EM.add_periodic_timer(@ttl){expire!} unless @ttl.nil? log.info "Started" log.info " config: #{config.inspect}" end stopping do log.info "Stopping rate processor" @timer.cancel if @timer end end |
Class Method Details
.prepare(config = {}) ⇒ Object
Options:
:precision - The precision to round to for emitted values. :cache_limit - Maxiumum amount of items allowed in the cache. :min_age - Minimum age that an item has to have in the cache to calculate rates. This exists to prevent too frequent updates to the cache which would yield very static or jumpy rates. Can be set to null to disable. :ttl - Allowed age of items in cache in seconds. If this is nil, items will never expire, so old elements will not be expunged until data type is restarted.
47 48 49 50 51 52 53 |
# File 'lib/ffwd/processor/rate.rb', line 47 def self.prepare config={} config[:precision] ||= 3 config[:cache_limit] ||= 10000 config[:min_age] ||= 0.5 config[:ttl] ||= 600 config end |
Instance Method Details
#expire! ⇒ Object
Expire cached events that are too old.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ffwd/processor/rate.rb', line 82 def expire! now = Time.new @cache.each do |key, value| diff = (now - value[:time]) next if diff < @ttl @expire[key] = value end unless @expire.empty? increment :expired, @cache.size - @expire.size @cache = @expire @expire = Hash.new end end |
#process(msg) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/ffwd/processor/rate.rb', line 98 def process msg key = msg[:key] time = msg[:time] value = msg[:value] || 0 unless (prev = @cache[key]).nil? prev_time = prev[:time] prev_value = prev[:value] diff = (time - prev_time) valid = @ttl.nil? or diff < @ttl aged = @min_age.nil? or diff > @min_age if diff > 0 and valid and aged rate = ((value - prev_value) / diff) rate = rate.round(@precision) unless @precision.nil? @emitter.metric.emit( :key => "#{key}.rate", :source => key, :value => rate) end else return increment :dropped if @cache.size >= @limit end increment :received @cache[key] = {:key => key, :time => time, :value => value} end |