Class: Fluent::UniqueCounterOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::UniqueCounterOutput
- Defined in:
- lib/fluent/plugin/out_unique_counter.rb
Constant Summary collapse
- UNITS =
{ minutes: 60, hours: 3600, days: 86400 }
Instance Attribute Summary collapse
-
#counts ⇒ Object
Returns the value of attribute counts.
-
#last_checked ⇒ Object
Returns the value of attribute last_checked.
-
#tick ⇒ Object
Returns the value of attribute tick.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush_emit ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_watch ⇒ Object
- #watch ⇒ Object
Instance Attribute Details
#counts ⇒ Object
Returns the value of attribute counts.
13 14 15 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 13 def counts @counts end |
#last_checked ⇒ Object
Returns the value of attribute last_checked.
14 15 16 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 14 def last_checked @last_checked end |
#tick ⇒ Object
Returns the value of attribute tick.
12 13 14 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 12 def tick @tick end |
Instance Method Details
#configure(conf) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 22 def configure(conf) super if @unit and UNITS[@unit.to_sym].nil? raise Fluent::ConfigError, "'unit' must be one of minutes/hours/days" end @count_interval = UNITS[@unit.to_sym] if @unit @counts = [] @mutex = Mutex.new end |
#emit(tag, es, chain) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 69 def emit(tag, es, chain) c = [] es.each do |time,record| value = record[@unique_key] next if value.nil? c << value.to_s.force_encoding('ASCII-8BIT') end @mutex.synchronize { @counts += c } chain.next end |
#flush_emit ⇒ Object
45 46 47 48 49 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 45 def flush_emit flushed,@counts = @counts,[] = {'unique_count' => flushed.uniq.count } Fluent::Engine.emit(@tag, Fluent::Engine.now, ) end |
#shutdown ⇒ Object
39 40 41 42 43 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 39 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
34 35 36 37 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 34 def start super start_watch end |
#start_watch ⇒ Object
51 52 53 54 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 51 def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end |
#watch ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/out_unique_counter.rb', line 56 def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.5 if (Fluent::Engine.now - @last_checked) >= @count_interval now = Fluent::Engine.now flush_emit @last_checked = now end end end |