Class: FFWD::Processor::CountProcessor

Inherits:
Object
  • Object
show all
Includes:
Logging, FFWD::Processor, Reporter
Defined in:
lib/ffwd/processor/count.rb

Overview

Implements counting statistics (similar to statsd).

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Reporter

included, #increment, map_meta, #report!, #reporter_data

Methods included from FFWD::Processor

category, included, load_discovered, load_processors, #name, registry

Methods included from Logging

included, #log

Methods included from Lifecycle

#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks

Constructor Details

#initialize(emitter, config = {}) ⇒ CountProcessor

Returns a new instance of CountProcessor.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/ffwd/processor/count.rb', line 44

def initialize emitter, config={}
  @emitter = emitter

  @cache_limit = config[:cache_limit]
  @timeout = config[:timeout]
  @window = config[:window]

  @cache = {}
  @timer = nil

  starting do
    log.info "Started"
    log.info "  config: #{config.inspect}"
  end

  stopping do
    log.info "Stopping count processor"
    @timer.cancel if @timer
    @timer = nil
  end
end

Class Method Details

.prepare(config) ⇒ Object



37
38
39
40
41
42
# File 'lib/ffwd/processor/count.rb', line 37

def self.prepare config
  config[:cache_limit] ||= 1000
  config[:timeout] ||= 300
  config[:window] ||= 30
  config
end

Instance Method Details

#check_timerObject



89
90
91
92
93
94
95
96
97
98
# File 'lib/ffwd/processor/count.rb', line 89

def check_timer
  return if @timer

  log.debug "Starting timer"

  @timer = EM::Timer.new(@window) do
    @timer = nil
    digest! Time.now
  end
end

#digest!(now) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/ffwd/processor/count.rb', line 100

def digest! now
  ms = FFWD.timing do
    flush_caches! now
  end

  log.debug "Digest took #{ms}ms"
end

#flush_caches!(now) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ffwd/processor/count.rb', line 66

def flush_caches! now
  @cache.each do |cache_key, entry|
    key = entry[:key]
    count = entry[:count]
    last = entry[:last]

    # OK to modify the hash being iterated over.
    if now - last > @timeout
      @cache.delete cache_key
      next
    end

    attributes = entry[:attributes]
    tags = entry[:tags]

    entry[:count] = 0

    @emitter.metric.emit(
      :key => key, :value => count, :source => key,
      :attributes => attributes, :tags => tags)
  end
end

#process(m) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/ffwd/processor/count.rb', line 108

def process m
  key = m[:key]
  value = m[:value]

  now = Time.now

  cache_key = [key, (m[:attributes] || {})].hash

  unless (entry = @cache[cache_key])
    return increment :dropped if @cache.size >= @cache_limit
    entry = @cache[cache_key] = {
      :key => key,
      :count => 0, :last => now, :tags => m[:tags],
      :attributes => m[:attributes]}
  end

  increment :received
  entry[:count] += value
  entry[:last] = now
  check_timer
end