Class: FFWD::Processor::CountProcessor
- Inherits:
-
Object
- Object
- FFWD::Processor::CountProcessor
- Includes:
- Logging, FFWD::Processor, Reporter
- Defined in:
- lib/ffwd/processor/count.rb
Overview
Implements counting statistics (similar to statsd).
Class Method Summary collapse
Instance Method Summary collapse
- #check_timer ⇒ Object
- #digest!(now) ⇒ Object
- #flush_caches!(now) ⇒ Object
-
#initialize(emitter, config = {}) ⇒ CountProcessor
constructor
A new instance of CountProcessor.
- #process(m) ⇒ Object
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
Methods included from FFWD::Processor
category, included, load_discovered, load_processors, #name, registry
Methods included from Logging
Methods included from Lifecycle
#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks
Constructor Details
#initialize(emitter, config = {}) ⇒ CountProcessor
Returns a new instance of CountProcessor.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/ffwd/processor/count.rb', line 44 def initialize emitter, config={} @emitter = emitter @cache_limit = config[:cache_limit] @timeout = config[:timeout] @window = config[:window] @cache = {} @timer = nil starting do log.info "Started" log.info " config: #{config.inspect}" end stopping do log.info "Stopping count processor" @timer.cancel if @timer @timer = nil end end |
Class Method Details
.prepare(config) ⇒ Object
37 38 39 40 41 42 |
# File 'lib/ffwd/processor/count.rb', line 37 def self.prepare config config[:cache_limit] ||= 1000 config[:timeout] ||= 300 config[:window] ||= 30 config end |
Instance Method Details
#check_timer ⇒ Object
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/ffwd/processor/count.rb', line 89 def check_timer return if @timer log.debug "Starting timer" @timer = EM::Timer.new(@window) do @timer = nil digest! Time.now end end |
#digest!(now) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/ffwd/processor/count.rb', line 100 def digest! now ms = FFWD.timing do flush_caches! now end log.debug "Digest took #{ms}ms" end |
#flush_caches!(now) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/ffwd/processor/count.rb', line 66 def flush_caches! now @cache.each do |cache_key, entry| key = entry[:key] count = entry[:count] last = entry[:last] # OK to modify the hash being iterated over. if now - last > @timeout @cache.delete cache_key next end attributes = entry[:attributes] = entry[:tags] entry[:count] = 0 @emitter.metric.emit( :key => key, :value => count, :source => key, :attributes => attributes, :tags => ) end end |
#process(m) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/ffwd/processor/count.rb', line 108 def process m key = m[:key] value = m[:value] now = Time.now cache_key = [key, (m[:attributes] || {})].hash unless (entry = @cache[cache_key]) return increment :dropped if @cache.size >= @cache_limit entry = @cache[cache_key] = { :key => key, :count => 0, :last => now, :tags => m[:tags], :attributes => m[:attributes]} end increment :received entry[:count] += value entry[:last] = now check_timer end |