Class: Fluent::HistogramOutput

Inherits:
Output
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_histogram.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHistogramOutput

fluentd output plugin’s methods



28
29
30
# File 'lib/fluent/plugin/out_histogram.rb', line 28

def initialize
  super
end

Instance Attribute Details

#flush_intervalObject

Returns the value of attribute flush_interval.



21
22
23
# File 'lib/fluent/plugin/out_histogram.rb', line 21

def flush_interval
  @flush_interval
end

#histsObject

Returns the value of attribute hists.



22
23
24
# File 'lib/fluent/plugin/out_histogram.rb', line 22

def hists
  @hists
end

#remove_prefix_stringObject

Returns the value of attribute remove_prefix_string.



24
25
26
# File 'lib/fluent/plugin/out_histogram.rb', line 24

def remove_prefix_string
  @remove_prefix_string
end

#zero_histObject

Returns the value of attribute zero_hist.



23
24
25
# File 'lib/fluent/plugin/out_histogram.rb', line 23

def zero_hist
  @zero_hist
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/out_histogram.rb', line 32

def configure(conf)
  super

  raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0
  raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1
  $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100
  @sampling = true if !!conf['sampling_rate']

  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  @tag_suffix_string = '.' + @tag_suffix if @tag_suffix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

  @zero_hist = [0] * @bin_num

  @hists = initialize_hists
  @sampling_counter = 0
  @tick = @sampling ? @sampling_rate.to_i : 1

  @mutex = Mutex.new

end

#emit(tag, es, chain) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_histogram.rb', line 105

def emit(tag, es, chain)
  chain.next

  es.each do |time, record|
    keys = record[@count_key]
    [keys].flatten.each do |k| 
      if !@sampling
        increment(tag, k)
      else
        @sampling_counter += 1
        if @sampling_counter >= @sampling_rate 
          increment(tag, k) 
          @sampling_counter = 0
        end
      end
    end
  end
end

#flushObject



168
169
170
171
# File 'lib/fluent/plugin/out_histogram.rb', line 168

def flush
  flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup)
  tagging(flushed)
end

#flush_emit(now) ⇒ Object



173
174
175
176
177
178
# File 'lib/fluent/plugin/out_histogram.rb', line 173

def flush_emit(now)
  flushed = flush
  flushed.each do |tag, data|
    Fluent::Engine.emit(tag, now, data)
  end
end

#generate_output(flushed) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fluent/plugin/out_histogram.rb', line 150

def generate_output(flushed)
  output = {}
  flushed.each do |tag, hist|
    output[tag] = {}
    sum = hist.inject(:+)
    avg = sum / hist.size
    sd = hist.instance_eval do
      sigmas = map { |n| (avg - n)**2 }
      Math.sqrt(sigmas.inject(:+) / size)
    end
    output[tag][:hist] = hist
    output[tag][:sum] = sum
    output[tag][:avg] = avg
    output[tag][:sd] = sd.to_i
  end
  output
end

#increment(tag, key) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_histogram.rb', line 93

def increment(tag, key)
  @hists[tag] ||= @zero_hist.dup
  id = key.hash % @bin_num
  @mutex.synchronize {
    (0..@alpha).each do |alpha|
      (-alpha..alpha).each do |al|
        @hists[tag][(id + al) % @bin_num] += @tick
      end
    end
  }
end

#initialize_hists(tags = nil) ⇒ Object

Histogram plugin’s method



83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_histogram.rb', line 83

def initialize_hists(tags=nil)
  hists = {}
  if tags
    tags.each do |tag|
      hists[tag] = @zero_hist.dup
    end
  end
  hists
end

#shutdownObject



74
75
76
77
78
# File 'lib/fluent/plugin/out_histogram.rb', line 74

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



57
58
59
60
# File 'lib/fluent/plugin/out_histogram.rb', line 57

def start
  super
  @watcher = Thread.new(&method(:watch))
end

#tagging(flushed) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/fluent/plugin/out_histogram.rb', line 124

def tagging(flushed)
  tagged = {}
  tagged = Hash[ flushed.map do |tag, hist|
    tagged_tag = tag.dup
    if @tag 
      tagged_tag = @tag
    else
      if @input_tag_remove_prefix &&
        ( ( tag.start_with?(@remove_prefix_string) && 
           tag.length > @remove_prefix_length ) ||
           tag == @input_tag_remove_prefix)
        tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1]
      end
      
      tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix
      tagged_tag << @tag_suffix_string if @tag_suffix

      tagged_tag.gsub!(/(^\.+)|(\.+$)/, '')
      tagged_tag.gsub!(/(\.\.+)/, '.')
    end

    [tagged_tag, hist]
  end ]
  tagged
end

#watchObject



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_histogram.rb', line 62

def watch
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @flush_interval
      now = Fluent::Engine.now
      flush_emit(now)
      @last_checked = now
    end
  end
end