Class: Fluent::HistogramOutput

Inherits:
Output
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_histogram.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHistogramOutput

fluentd output plugin’s methods



30
31
32
# File 'lib/fluent/plugin/out_histogram.rb', line 30

def initialize
  super
end

Instance Attribute Details

#flush_intervalObject

Returns the value of attribute flush_interval.



23
24
25
# File 'lib/fluent/plugin/out_histogram.rb', line 23

def flush_interval
  @flush_interval
end

#histsObject

Returns the value of attribute hists.



24
25
26
# File 'lib/fluent/plugin/out_histogram.rb', line 24

def hists
  @hists
end

#remove_prefix_stringObject

Returns the value of attribute remove_prefix_string.



26
27
28
# File 'lib/fluent/plugin/out_histogram.rb', line 26

def remove_prefix_string
  @remove_prefix_string
end

#zero_histObject

Returns the value of attribute zero_hist.



25
26
27
# File 'lib/fluent/plugin/out_histogram.rb', line 25

def zero_hist
  @zero_hist
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_histogram.rb', line 34

def configure(conf)
  super

  raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0
  raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1
  $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100
  @sampling = true if !!conf['sampling_rate']

  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  @tag_suffix_string = '.' + @tag_suffix if @tag_suffix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

  @zero_hist = [0] * @bin_num

  @hists = initialize_hists
  @sampling_counter = 0
  @tick = @sampling ? @sampling_rate.to_i : 1

  if @alpha > 0
    @revalue = (@alpha+1)**2 if @alpha != 0
  else
    @disable_revalue = true
  end

  @mutex = Mutex.new

end

#emit(tag, es, chain) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/out_histogram.rb', line 115

def emit(tag, es, chain)
  chain.next

  es.each do |time, record|
    keys = record[@count_key]
    if keys.instance_of? Hash
      keys.each do |k, v|
        if !@sampling
          increment(tag, k, v)
        else
          @sampling_counter += v
          if @sampling_counter >= @sampling_rate
            increment(tag, k, v)
            @sampling_counter = 0
          end
        end
      end
    else
      [keys].flatten.each do |k|
        if !@sampling
          increment(tag, k)
        else
          @sampling_counter += 1
          if @sampling_counter >= @sampling_rate
            increment(tag, k)
            @sampling_counter = 0
          end
        end
      end
    end
  end # es.each }}}
end

#flushObject



199
200
201
202
# File 'lib/fluent/plugin/out_histogram.rb', line 199

def flush
  flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup)
  tagging(flushed)
end

#flush_emit(now) ⇒ Object



204
205
206
207
208
209
# File 'lib/fluent/plugin/out_histogram.rb', line 204

def flush_emit(now)
  flushed = flush
  flushed.each do |tag, data|
    Fluent::Engine.emit(tag, now, data)
  end
end

#generate_output(flushed) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/fluent/plugin/out_histogram.rb', line 174

def generate_output(flushed)
  output = {}
  flushed.each do |tag, hist|
    output[tag] = {}
    act_hist = hist.dup.select!{|v| v > 0}
    if act_hist.size == 0 # equal to zero_hist
      sum = 0
      avg = 0
      sd = 0
    else
      sum = act_hist.inject(:+)
      avg = sum / act_hist.size
      sd = act_hist.instance_eval do
        sigmas = map { |n| (avg - n)**2 }
        Math.sqrt(sigmas.inject(:+) / size)
      end
    end
    output[tag][:hist] = hist if @out_include_hist
    output[tag][:sum] = @disable_revalue ? sum : sum / @revalue
    output[tag][:avg] = @disable_revalue ? avg : avg / @revalue
    output[tag][:sd] = sd.to_i
  end
  output
end

#increment(tag, key, v = 1) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_histogram.rb', line 101

def increment(tag, key, v=1)
  @hists[tag] ||= @zero_hist.dup

  # id = key.hash % @bin_num
  id = key.to_s[0..9].codepoints.collect{|cp| cp}.join().to_i % @bin_num # attention to long key(length > 10)
  @mutex.synchronize {
    (0..@alpha).each do |alpha|
      (-alpha..alpha).each do |al|
        @hists[tag][(id + al) % @bin_num] += @tick * v
      end
    end
  }
end

#initialize_hists(tags = nil) ⇒ Object

Histogram plugin’s method



91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_histogram.rb', line 91

def initialize_hists(tags=nil)
  hists = {}
  if tags
    tags.each do |tag|
      hists[tag] = @zero_hist.dup
    end
  end
  hists
end

#shutdownObject



82
83
84
85
86
# File 'lib/fluent/plugin/out_histogram.rb', line 82

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



65
66
67
68
# File 'lib/fluent/plugin/out_histogram.rb', line 65

def start
  super
  @watcher = Thread.new(&method(:watch))
end

#tagging(flushed) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/out_histogram.rb', line 148

def tagging(flushed)
  tagged = {}
  tagged = Hash[ flushed.map do |tag, hist|
    tagged_tag = tag.dup
    if @tag
      tagged_tag = @tag
    else
      if @input_tag_remove_prefix &&
        ( ( tag.start_with?(@remove_prefix_string) &&
           tag.length > @remove_prefix_length ) ||
           tag == @input_tag_remove_prefix)
        tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1]
      end

      tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix
      tagged_tag << @tag_suffix_string if @tag_suffix

      tagged_tag.gsub!(/(^\.+)|(\.+$)/, '')
      tagged_tag.gsub!(/(\.\.+)/, '.')
    end

    [tagged_tag, hist]
  end ]
  tagged
end

#watchObject



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_histogram.rb', line 70

def watch
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @flush_interval
      now = Fluent::Engine.now
      flush_emit(now)
      @last_checked = now
    end
  end
end