Class: Fluent::TDCounterOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_td_counter.rb

Constant Summary collapse

COUNT_FIELD =
'count'
BYTES_FIELD =
'bytes'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTDCounterOutput

Returns a new instance of TDCounterOutput.



11
12
13
14
# File 'lib/fluent/plugin/out_td_counter.rb', line 11

def initialize
  super
  @output = nil
end

Instance Attribute Details

#countsObject

config_param :aggregate, :string, :default => ‘tag’ config_param :exact_count, :bool, :default => true



8
9
10
# File 'lib/fluent/plugin/out_td_counter.rb', line 8

def counts
  @counts
end

#outputObject (readonly)

Returns the value of attribute output.



9
10
11
# File 'lib/fluent/plugin/out_td_counter.rb', line 9

def output
  @output
end

Instance Method Details

#configure(conf) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_td_counter.rb', line 16

def configure(conf)
  super

  conf.elements.select { |e|
    e.name == 'store'
  }.each { |e|
    type = e['type']
    unless type
      raise ConfigError, "Missing 'type' parameter on <store> directive"
    end
    $log.debug "adding store type = #{type.dump}"

    @output = Plugin.new_output(type)
    @output.configure(e)
  }

  @counts = {}
  @mutex = Mutex.new
end

#count_initializedObject



59
60
61
# File 'lib/fluent/plugin/out_td_counter.rb', line 59

def count_initialized
  {COUNT_FIELD => 0, BYTES_FIELD => 0}
end

#countup(tag, counts, bytes) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/out_td_counter.rb', line 63

def countup(tag, counts, bytes)
  @mutex.synchronize {
    @counts[tag] ||= count_initialized
    count = @counts[tag]
    count[COUNT_FIELD] += counts
    count[BYTES_FIELD] += bytes
  }
end

#emit(tag, es, chain) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_td_counter.rb', line 81

def emit(tag, es, chain)
  count = 0
  bytes = 0

  # TODO: if bytes is not needed, use Event#num_records to reduce processing time
  es.each { |time, record|
    count += 1
    bytes += record.to_msgpack.bytesize
  }

  countup(tag, count, bytes)
  unless @output.nil?
    @output.emit(tag, es, chain)
  end

  chain.next
end

#flush_countsObject



72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_td_counter.rb', line 72

def flush_counts
  counts = nil
  @mutex.synchronize {
    counts = @counts
    @counts = {}
  }
  counts
end

#shutdownObject



48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_td_counter.rb', line 48

def shutdown
  unless @output.nil?
    @output.shutdown
  end

  super
end

#startObject



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/out_td_counter.rb', line 36

def start
  super

  unless check_td_monitor_agent
    $log.warn "in_td_monitor_agent not found. If you want to use out_td_counter, then you should configure in_td_monitor_agent in same configuration"
  end

  unless @output.nil?
    @output.start
  end
end