Class: Fluent::HistogramOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::HistogramOutput
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_histogram.rb
Instance Attribute Summary collapse
-
#flush_interval ⇒ Object
Returns the value of attribute flush_interval.
-
#hists ⇒ Object
Returns the value of attribute hists.
-
#remove_prefix_string ⇒ Object
Returns the value of attribute remove_prefix_string.
-
#zero_hist ⇒ Object
Returns the value of attribute zero_hist.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush ⇒ Object
- #flush_emit(now) ⇒ Object
- #generate_output(flushed) ⇒ Object
- #increment(tag, key) ⇒ Object
-
#initialize ⇒ HistogramOutput
constructor
fluentd output plugin’s methods.
-
#initialize_hists(tags = nil) ⇒ Object
Histogram plugin’s method.
- #shutdown ⇒ Object
- #start ⇒ Object
- #tagging(flushed) ⇒ Object
- #watch ⇒ Object
Constructor Details
#initialize ⇒ HistogramOutput
fluentd output plugin’s methods
28 29 30 |
# File 'lib/fluent/plugin/out_histogram.rb', line 28 def initialize super end |
Instance Attribute Details
#flush_interval ⇒ Object
Returns the value of attribute flush_interval.
21 22 23 |
# File 'lib/fluent/plugin/out_histogram.rb', line 21 def flush_interval @flush_interval end |
#hists ⇒ Object
Returns the value of attribute hists.
22 23 24 |
# File 'lib/fluent/plugin/out_histogram.rb', line 22 def hists @hists end |
#remove_prefix_string ⇒ Object
Returns the value of attribute remove_prefix_string.
24 25 26 |
# File 'lib/fluent/plugin/out_histogram.rb', line 24 def remove_prefix_string @remove_prefix_string end |
#zero_hist ⇒ Object
Returns the value of attribute zero_hist.
23 24 25 |
# File 'lib/fluent/plugin/out_histogram.rb', line 23 def zero_hist @zero_hist end |
Instance Method Details
#configure(conf) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_histogram.rb', line 32 def configure(conf) super raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0 raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1 $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100 @sampling = true if !!conf['sampling_rate'] @tag_prefix_string = @tag_prefix + '.' if @tag_prefix @tag_suffix_string = '.' + @tag_suffix if @tag_suffix if @input_tag_remove_prefix @remove_prefix_string = @input_tag_remove_prefix + '.' @remove_prefix_length = @remove_prefix_string.length end @zero_hist = [0] * @bin_num @hists = initialize_hists @sampling_counter = 0 @tick = @sampling ? @sampling_rate.to_i : 1 @mutex = Mutex.new end |
#emit(tag, es, chain) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_histogram.rb', line 105 def emit(tag, es, chain) chain.next es.each do |time, record| keys = record[@count_key] [keys].flatten.each do |k| if !@sampling increment(tag, k) else @sampling_counter += 1 if @sampling_counter >= @sampling_rate increment(tag, k) @sampling_counter = 0 end end end end end |
#flush ⇒ Object
168 169 170 171 |
# File 'lib/fluent/plugin/out_histogram.rb', line 168 def flush flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup) tagging(flushed) end |
#flush_emit(now) ⇒ Object
173 174 175 176 177 178 |
# File 'lib/fluent/plugin/out_histogram.rb', line 173 def flush_emit(now) flushed = flush flushed.each do |tag, data| Fluent::Engine.emit(tag, now, data) end end |
#generate_output(flushed) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/fluent/plugin/out_histogram.rb', line 150 def generate_output(flushed) output = {} flushed.each do |tag, hist| output[tag] = {} sum = hist.inject(:+) avg = sum / hist.size sd = hist.instance_eval do sigmas = map { |n| (avg - n)**2 } Math.sqrt(sigmas.inject(:+) / size) end output[tag][:hist] = hist output[tag][:sum] = sum output[tag][:avg] = avg output[tag][:sd] = sd.to_i end output end |
#increment(tag, key) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/fluent/plugin/out_histogram.rb', line 93 def increment(tag, key) @hists[tag] ||= @zero_hist.dup id = key.hash % @bin_num @mutex.synchronize { (0..@alpha).each do |alpha| (-alpha..alpha).each do |al| @hists[tag][(id + al) % @bin_num] += @tick end end } end |
#initialize_hists(tags = nil) ⇒ Object
Histogram plugin’s method
83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/out_histogram.rb', line 83 def initialize_hists(=nil) hists = {} if .each do |tag| hists[tag] = @zero_hist.dup end end hists end |
#shutdown ⇒ Object
74 75 76 77 78 |
# File 'lib/fluent/plugin/out_histogram.rb', line 74 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
57 58 59 60 |
# File 'lib/fluent/plugin/out_histogram.rb', line 57 def start super @watcher = Thread.new(&method(:watch)) end |
#tagging(flushed) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/fluent/plugin/out_histogram.rb', line 124 def tagging(flushed) tagged = {} tagged = Hash[ flushed.map do |tag, hist| tagged_tag = tag.dup if @tag tagged_tag = @tag else if @input_tag_remove_prefix && ( ( tag.start_with?(@remove_prefix_string) && tag.length > @remove_prefix_length ) || tag == @input_tag_remove_prefix) tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1] end tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix tagged_tag << @tag_suffix_string if @tag_suffix tagged_tag.gsub!(/(^\.+)|(\.+$)/, '') tagged_tag.gsub!(/(\.\.+)/, '.') end [tagged_tag, hist] end ] tagged end |
#watch ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_histogram.rb', line 62 def watch @last_checked = Fluent::Engine.now while true sleep 0.5 if Fluent::Engine.now - @last_checked >= @flush_interval now = Fluent::Engine.now flush_emit(now) @last_checked = now end end end |