Class: FFWD::Processor::RateProcessor
- Inherits:
-
Object
- Object
- FFWD::Processor::RateProcessor
- Includes:
- Logging, FFWD::Processor, Reporter
- Defined in:
- lib/ffwd/processor/rate.rb
Overview
Implements rate statistics (similar to derive in collectd).
Instance Method Summary collapse
-
#expire! ⇒ Object
Expire cached events that are too old.
-
#initialize(emitter, opts = {}) ⇒ RateProcessor
constructor
Options:.
- #process(msg) ⇒ Object
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
Methods included from FFWD::Processor
category, included, load_discovered, load_processors, #name, registry
Methods included from Logging
Methods included from Lifecycle
#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks
Constructor Details
#initialize(emitter, opts = {}) ⇒ RateProcessor
Options:
:precision - The precision to round to for emitted values. :cache_limit - Maxiumum amount of items allowed in the cache. :min_age - Minimum age that an item has to have in the cache to calculate rates. This exists to prevent too frequent updates to the cache which would yield very static or jumpy rates. Can be set to null to disable. :ttl - Allowed age of items in cache in seconds. If this is nil, items will never expire, so old elements will not be expunged until data type is restarted.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/ffwd/processor/rate.rb', line 47 def initialize emitter, opts={} @emitter = emitter @precision = opts[:precision] || 3 @limit = opts[:cache_limit] || 10000 @min_age = opts[:min_age] || 0.5 @ttl = opts[:ttl] || 600 # keep a reference to the expire cache to prevent having to allocate it # all the time. @expire = Hash.new # Cache of active events. @cache = Hash.new starting do log.info "Starting rate processor (ttl: #{@ttl})" @timer = EM.add_periodic_timer(@ttl){expire!} unless @ttl.nil? end stopping do log.info "Stopping rate processor" @timer.cancel if @timer end end |
Instance Method Details
#expire! ⇒ Object
Expire cached events that are too old.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/ffwd/processor/rate.rb', line 72 def expire! now = Time.new @cache.each do |key, value| diff = (now - value[:time]) next if diff < @ttl @expire[key] = value end unless @expire.empty? increment :expired, @cache.size - @expire.size @cache = @expire @expire = Hash.new end end |
#process(msg) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/ffwd/processor/rate.rb', line 88 def process msg key = msg[:key] time = msg[:time] value = msg[:value] || 0 unless (prev = @cache[key]).nil? prev_time = prev[:time] prev_value = prev[:value] diff = (time - prev_time) valid = @ttl.nil? or diff < @ttl aged = @min_age.nil? or diff > @min_age if diff > 0 and valid and aged rate = ((value - prev_value) / diff) rate = rate.round(@precision) unless @precision.nil? @emitter.metric.emit( :key => "#{key}.rate", :source => key, :value => rate) end else return increment :dropped if @cache.size >= @limit end increment :received @cache[key] = {:key => key, :time => time, :value => value} end |