Class: Fluent::TDCounterOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::TDCounterOutput
- Defined in:
- lib/fluent/plugin/out_td_counter.rb
Constant Summary collapse
- COUNT_FIELD =
'count'
- BYTES_FIELD =
'bytes'
Instance Attribute Summary collapse
-
#counts ⇒ Object
config_param :aggregate, :string, :default => ‘tag’ config_param :exact_count, :bool, :default => true.
-
#output ⇒ Object
readonly
Returns the value of attribute output.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #count_initialized ⇒ Object
- #countup(tag, counts, bytes) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush_counts ⇒ Object
-
#initialize ⇒ TDCounterOutput
constructor
A new instance of TDCounterOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ TDCounterOutput
Returns a new instance of TDCounterOutput.
11 12 13 14 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 11 def initialize super @output = nil end |
Instance Attribute Details
#counts ⇒ Object
config_param :aggregate, :string, :default => ‘tag’ config_param :exact_count, :bool, :default => true
8 9 10 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 8 def counts @counts end |
#output ⇒ Object (readonly)
Returns the value of attribute output.
9 10 11 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 9 def output @output end |
Instance Method Details
#configure(conf) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 16 def configure(conf) super conf.elements.select { |e| e.name == 'store' }.each { |e| type = e['type'] unless type raise ConfigError, "Missing 'type' parameter on <store> directive" end $log.debug "adding store type = #{type.dump}" @output = Plugin.new_output(type) @output.configure(e) } @counts = {} @mutex = Mutex.new end |
#count_initialized ⇒ Object
59 60 61 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 59 def count_initialized {COUNT_FIELD => 0, BYTES_FIELD => 0} end |
#countup(tag, counts, bytes) ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 63 def countup(tag, counts, bytes) @mutex.synchronize { @counts[tag] ||= count_initialized count = @counts[tag] count[COUNT_FIELD] += counts count[BYTES_FIELD] += bytes } end |
#emit(tag, es, chain) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 81 def emit(tag, es, chain) count = 0 bytes = 0 # TODO: if bytes is not needed, use Event#num_records to reduce processing time es.each { |time, record| count += 1 bytes += record.to_msgpack.bytesize } countup(tag, count, bytes) unless @output.nil? @output.emit(tag, es, chain) end chain.next end |
#flush_counts ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 72 def flush_counts counts = nil @mutex.synchronize { counts = @counts @counts = {} } counts end |
#shutdown ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 48 def shutdown unless @output.nil? @output.shutdown end super end |
#start ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_td_counter.rb', line 36 def start super unless check_td_monitor_agent $log.warn "in_td_monitor_agent not found. If you want to use out_td_counter, then you should configure in_td_monitor_agent in same configuration" end unless @output.nil? @output.start end end |