Class: FFWD::Processor::CountProcessor
Overview
Implements counting statistics (similar to statsd).
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
category, included, load_discovered, load_processors, #name, registry
Methods included from Logging
included, #log
Methods included from Lifecycle
#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks
Constructor Details
#initialize(emitter, config = {}) ⇒ CountProcessor
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/ffwd/processor/count.rb', line 44
def initialize emitter, config={}
@emitter = emitter
@cache_limit = config[:cache_limit]
@timeout = config[:timeout]
@window = config[:window]
@cache = {}
@timer = nil
starting do
log.info "Started"
log.info " config: #{config.inspect}"
end
stopping do
log.info "Stopping count processor"
@timer.cancel if @timer
@timer = nil
end
end
|
Class Method Details
.prepare(config) ⇒ Object
37
38
39
40
41
42
|
# File 'lib/ffwd/processor/count.rb', line 37
def self.prepare config
config[:cache_limit] ||= 1000
config[:timeout] ||= 300
config[:window] ||= 30
config
end
|
Instance Method Details
#check_timer ⇒ Object
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/ffwd/processor/count.rb', line 89
def check_timer
return if @timer
log.debug "Starting timer"
@timer = EM::Timer.new(@window) do
@timer = nil
digest! Time.now
end
end
|
#digest!(now) ⇒ Object
100
101
102
103
104
105
106
|
# File 'lib/ffwd/processor/count.rb', line 100
def digest! now
ms = FFWD.timing do
flush_caches! now
end
log.debug "Digest took #{ms}ms"
end
|
#flush_caches!(now) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/ffwd/processor/count.rb', line 66
def flush_caches! now
@cache.each do |cache_key, entry|
key = entry[:key]
count = entry[:count]
last = entry[:last]
if now - last > @timeout
@cache.delete cache_key
next
end
attributes = entry[:attributes]
tags = entry[:tags]
entry[:count] = 0
@emitter.metric.emit(
:key => key, :value => count, :source => key,
:attributes => attributes, :tags => tags)
end
end
|
#process(m) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'lib/ffwd/processor/count.rb', line 108
def process m
key = m[:key]
value = m[:value]
now = Time.now
cache_key = [key, (m[:attributes] || {})].hash
unless (entry = @cache[cache_key])
return increment :dropped if @cache.size >= @cache_limit
entry = @cache[cache_key] = {
:key => key,
:count => 0, :last => now, :tags => m[:tags],
:attributes => m[:attributes]}
end
increment :received
entry[:count] += value
entry[:last] = now
check_timer
end
|