Class: FFWD::Processor::CountProcessor
- Inherits:
-
Object
- Object
- FFWD::Processor::CountProcessor
- Includes:
- Logging, FFWD::Processor, Reporter
- Defined in:
- lib/ffwd/processor/count.rb
Overview
Implements counting statistics (similar to statsd).
Instance Method Summary collapse
- #check_timer ⇒ Object
- #digest!(now) ⇒ Object
- #flush_caches!(now) ⇒ Object
-
#initialize(emitter, opts = {}) ⇒ CountProcessor
constructor
A new instance of CountProcessor.
- #process(m) ⇒ Object
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
Methods included from FFWD::Processor
category, included, load_discovered, load_processors, #name, registry
Methods included from Logging
Methods included from Lifecycle
#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks
Constructor Details
#initialize(emitter, opts = {}) ⇒ CountProcessor
Returns a new instance of CountProcessor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/ffwd/processor/count.rb', line 37 def initialize emitter, opts={} @emitter = emitter @cache_limit = opts[:cache_limit] || 1000 @timeout = opts[:timeout] || 300 @period = opts[:period] || 10 @cache = {} @timer = nil starting do log.info "Starting count processor on a window of #{@period}s" end stopping do log.info "Stopping count processor" @timer.cancel if @timer @timer = nil end end |
Instance Method Details
#check_timer ⇒ Object
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/ffwd/processor/count.rb', line 73 def check_timer return if @timer log.debug "Starting timer" @timer = EM::Timer.new(@period) do @timer = nil digest! end end |
#digest!(now) ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/ffwd/processor/count.rb', line 84 def digest! now ms = FFWD.timing do flush_caches! now end log.debug "Digest took #{ms}ms" end |
#flush_caches!(now) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/ffwd/processor/count.rb', line 56 def flush_caches! now @cache.each do |key, entry| count = entry[:count] last = entry[:last] if now - last > @timeout @cache.delete key next end entry[:count] = 0 @emitter.metric.emit( :key => "#{key}.sum", :value => count, :source => key) end end |
#process(m) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/ffwd/processor/count.rb', line 92 def process m key = m[:key] value = m[:value] now = Time.now unless (entry = @cache[key]) return increment :dropped if @cache.size >= @cache_limit entry = @cache[key] = {:count => 0, :last => now} end increment :received entry[:count] += value entry[:last] = now check_timer end |