Class: Fluent::UniqueCounterOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_unique_counter.rb

Constant Summary collapse

UNITS =
{
  minutes: 60,
  hours: 3600,
  days: 86400
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#countsObject

Returns the value of attribute counts.



13
14
15
# File 'lib/fluent/plugin/out_unique_counter.rb', line 13

def counts
  @counts
end

#last_checkedObject

Returns the value of attribute last_checked.



14
15
16
# File 'lib/fluent/plugin/out_unique_counter.rb', line 14

def last_checked
  @last_checked
end

#tickObject

Returns the value of attribute tick.



12
13
14
# File 'lib/fluent/plugin/out_unique_counter.rb', line 12

def tick
  @tick
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/fluent/plugin/out_unique_counter.rb', line 22

def configure(conf)
  super

  if @unit and UNITS[@unit.to_sym].nil?
    raise Fluent::ConfigError, "'unit' must be one of minutes/hours/days"
  end
  @count_interval = UNITS[@unit.to_sym] if @unit

  @counts = []
  @mutex = Mutex.new
end

#emit(tag, es, chain) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_unique_counter.rb', line 69

def emit(tag, es, chain)
  c = []

  es.each do |time,record|
    value = record[@unique_key]
    next if value.nil?

    c << value.to_s.force_encoding('ASCII-8BIT')
  end
  @mutex.synchronize {
    @counts += c
  }

  chain.next
end

#flush_emitObject



45
46
47
48
49
# File 'lib/fluent/plugin/out_unique_counter.rb', line 45

def flush_emit
  flushed,@counts = @counts,[]
  message = {'unique_count' => flushed.uniq.count }
  Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
end

#shutdownObject



39
40
41
42
43
# File 'lib/fluent/plugin/out_unique_counter.rb', line 39

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



34
35
36
37
# File 'lib/fluent/plugin/out_unique_counter.rb', line 34

def start
  super
  start_watch
end

#start_watchObject



51
52
53
54
# File 'lib/fluent/plugin/out_unique_counter.rb', line 51

def start_watch
  # for internal, or tests only
  @watcher = Thread.new(&method(:watch))
end

#watchObject



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/out_unique_counter.rb', line 56

def watch
  # instance variable, and public accessable, for test
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if (Fluent::Engine.now - @last_checked) >= @count_interval
      now = Fluent::Engine.now
      flush_emit
      @last_checked = now
    end
  end
end