Class: FFWD::Processor::CountProcessor

Inherits:
Object
  • Object
show all
Includes:
Logging, FFWD::Processor, Reporter
Defined in:
lib/ffwd/processor/count.rb

Overview

Implements counting statistics (similar to statsd).

Instance Method Summary collapse

Methods included from Reporter

included, #increment, map_meta, #report!, #reporter_data

Methods included from FFWD::Processor

category, included, load_discovered, load_processors, #name, registry

Methods included from Logging

included, #log

Methods included from Lifecycle

#depend_on, #start, #started?, #starting, #starting_hooks, #stop, #stopped?, #stopping, #stopping_hooks

Constructor Details

#initialize(emitter, opts = {}) ⇒ CountProcessor

Returns a new instance of CountProcessor.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ffwd/processor/count.rb', line 37

def initialize emitter, opts={}
  @emitter = emitter
  @cache_limit = opts[:cache_limit] || 1000
  @timeout = opts[:timeout] || 300
  @period = opts[:period] || 30
  @cache = {}
  @timer = nil

  starting do
    log.info "Starting count processor on a window of #{@period}s"
  end

  stopping do
    log.info "Stopping count processor"
    @timer.cancel if @timer
    @timer = nil
  end
end

Instance Method Details

#check_timerObject



79
80
81
82
83
84
85
86
87
88
# File 'lib/ffwd/processor/count.rb', line 79

def check_timer
  return if @timer

  log.debug "Starting timer"

  @timer = EM::Timer.new(@period) do
    @timer = nil
    digest! Time.now
  end
end

#digest!(now) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/ffwd/processor/count.rb', line 90

def digest! now
  ms = FFWD.timing do
    flush_caches! now
  end

  log.debug "Digest took #{ms}ms"
end

#flush_caches!(now) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ffwd/processor/count.rb', line 56

def flush_caches! now
  @cache.each do |cache_key, entry|
    key = entry[:key]
    count = entry[:count]
    last = entry[:last]

    # OK to modify the hash being iterated over.
    if now - last > @timeout
      @cache.delete cache_key
      next
    end

    attributes = entry[:attributes]
    tags = entry[:tags]

    entry[:count] = 0

    @emitter.metric.emit(
      :key => key, :value => count, :source => key,
      :attributes => attributes, :tags => tags)
  end
end

#process(m) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/ffwd/processor/count.rb', line 98

def process m
  key = m[:key]
  value = m[:value]

  now = Time.now

  cache_key = [key, (m[:attributes] || {})].hash

  unless (entry = @cache[cache_key])
    return increment :dropped if @cache.size >= @cache_limit
    entry = @cache[cache_key] = {
      :key => key,
      :count => 0, :last => now, :tags => m[:tags],
      :attributes => m[:attributes]}
  end

  increment :received
  entry[:count] += value
  entry[:last] = now
  check_timer
end