Class: Fluent::NumericCounterOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_numeric_counter.rb

Constant Summary collapse

PATTERN_MAX_NUM =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeNumericCounterOutput

Returns a new instance of NumericCounterOutput.



4
5
6
7
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 4

def initialize
  super
  require 'pathname'
end

Instance Attribute Details

#countsObject

Returns the value of attribute counts.



30
31
32
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 30

def counts
  @counts
end

#last_checkedObject

Returns the value of attribute last_checked.



31
32
33
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 31

def last_checked
  @last_checked
end

#patternsObject

Returns the value of attribute patterns.



34
35
36
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 34

def patterns
  @patterns
end

#saved_atObject

Returns the value of attribute saved_at.



33
34
35
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 33

def saved_at
  @saved_at
end

#saved_durationObject

Returns the value of attribute saved_duration.



32
33
34
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 32

def saved_duration
  @saved_duration
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 53

def configure(conf)
  super

  if @unit
    @count_interval = case @unit
                      when 'minute' then 60
                      when 'hour' then 3600
                      when 'day' then 86400
                      else
                        raise Fluent::ConfigError, 'unit must be one of minute/hour/day'
                      end
  end

  @aggregate = @aggregate.to_sym
  raise Fluent::ConfigError, "numeric_counter allows tag/all to aggregate unit" unless [:tag, :all].include?(@aggregate)

  @patterns = [[0, 'unmatched', nil, nil]] # counts-index, name, low, high
  pattern_names = ['unmatched']

  invalids = conf.keys.select{|k| k =~ /^pattern(\d+)$/ and not (1..PATTERN_MAX_NUM).include?($1.to_i)}
  if invalids.size > 0
    log.warn "invalid number patterns (valid pattern number:1-#{PATTERN_MAX_NUM}):" + invalids.join(",")
  end
  (1..PATTERN_MAX_NUM).each do |i|
    next unless conf["pattern#{i}"]
    name,low,high = conf["pattern#{i}"].split(/ +/, 3)
    @patterns.push([i, name, parse_num(low), parse_num(high)])
    pattern_names.push(name)
  end
  pattern_index_list = conf.keys.select{|s| s =~ /^pattern\d$/}.map{|v| (/^pattern(\d)$/.match(v))[1].to_i}
  unless pattern_index_list.reduce(true){|v,i| v and @patterns[i]}
    raise Fluent::ConfigError, "jump of pattern index found"
  end
  unless @patterns.length == pattern_names.uniq.length
    raise Fluent::ConfigError, "duplicated pattern names found"
  end
  @patterns[1..-1].each do |index, name, low, high|
    raise Fluent::ConfigError, "numbers of low/high missing" if low.nil?
    raise Fluent::ConfigError, "unspecified high threshold allowed only in last pattern" if high.nil? and index != @patterns.length - 1
  end
  
  if @output_per_tag
    raise Fluent::ConfigError, "tag_prefix must be specified with output_per_tag" unless @tag_prefix
    @tag_prefix_string = @tag_prefix + '.'
  end

  if @input_tag_remove_prefix
    @removed_prefix_string = @input_tag_remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  if @store_file
    f = Pathname.new(@store_file)
    if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
      raise Fluent::ConfigError, "#{@store_file} is not writable"
    end
  end

  @counts = count_initialized
  @mutex = Mutex.new
end

#count_initialized(keys = nil) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 128

def count_initialized(keys=nil)
  # counts['tag'][pattern_index_num] = count
  # counts['tag'][-1] = sum
  if @aggregate == :all
    {'all' => Array.new(@patterns.length + 1){|i| 0}}
  elsif keys
    values = Array.new(keys.length){|i|
      Array.new(@patterns.length + 1){|j| 0 }
    }
    Hash[[keys, values].transpose]
  else
    {}
  end
end

#countups(tag, counts) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 143

def countups(tag, counts)
  if @aggregate == :all
    tag = 'all'
  end

  @mutex.synchronize {
    @counts[tag] ||= [0] * (@patterns.length + 1)
    sum = 0
    counts.each_with_index do |count, i|
      sum += count
      @counts[tag][i] += count
    end
    @counts[tag][-1] += sum
  }
end

#emit(tag, es, chain) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 253

def emit(tag, es, chain)
  c = [0] * @patterns.length

  es.each do |time,record|
    value = record[@count_key]
    next if value.nil?

    value = value.to_f
    matched = false
    @patterns.each do |index, name, low, high|
      next if low.nil? or value < low or (not high.nil? and value >= high)
      c[index] += 1
      matched = true
      break
    end
    c[0] += 1 unless matched
  end
  countups(tag, c)

  chain.next
end

#flush(step) ⇒ Object

returns one message



213
214
215
216
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 213

def flush(step) # returns one message
  flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
  generate_output(flushed, step)
end

#flush_emit(step) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 223

def flush_emit(step)
  if @output_per_tag
    time = Fluent::Engine.now
    flush_per_tags(step).each do |tag,message|
      Fluent::Engine.emit(@tag_prefix_string + tag, time, message)
    end
  else
    message = flush(step)
    if message.keys.size > 0
      Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
    end
  end
end

#flush_per_tags(step) ⇒ Object

returns map of tag - message



218
219
220
221
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 218

def flush_per_tags(step) # returns map of tag - message
  flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
  generate_output_per_tags(flushed, step)
end

#generate_fields(step, target_counts, attr_prefix, output) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 166

def generate_fields(step, target_counts, attr_prefix, output)
  sum = if @outcast_unmatched
          target_counts[1..-2].inject(:+)
        else
          target_counts[-1]
        end
  messages = target_counts.delete_at(-1)

  target_counts.each_with_index do |count,i|
    name = @patterns[i][1]
    output[attr_prefix + name + '_count'] = count
    output[attr_prefix + name + '_rate'] = ((count * 100.0) / (1.00 * step)).floor / 100.0
    unless i == 0 and @outcast_unmatched
      output[attr_prefix + name + '_percentage'] = count * 100.0 / (1.00 * sum) if sum > 0
    end
    if @output_messages
      output[attr_prefix + 'messages'] = messages
    end
  end

  output    
end

#generate_output(counts, step) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 189

def generate_output(counts, step)
  if @aggregate == :all
    return generate_fields(step, counts['all'], '', {})
  end

  output = {}
  counts.keys.each do |tag|
    generate_fields(step, counts[tag], stripped_tag(tag) + '_', output)
  end
  output
end

#generate_output_per_tags(counts, step) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 201

def generate_output_per_tags(counts, step)
  if @aggregate == :all
    return {'all' => generate_fields(step, counts['all'], '', {})}
  end

  output_pairs = {}
  counts.keys.each do |tag|
    output_pairs[stripped_tag(tag)] = generate_fields(step, counts[tag], '', {})
  end
  output_pairs
end

#load_status(file_path, count_interval) ⇒ Object

Load internal status from a file

Parameters:

  • file_path (String)
  • count_interval (Interger)


301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 301

def load_status(file_path, count_interval)
  return unless (f = Pathname.new(file_path)).exist?

  begin
    f.open('rb') do |f|
      stored = Marshal.load(f)
      if stored[:aggregate] == @aggregate and
        stored[:count_key] == @count_key and
        stored[:patterns]  == @patterns

        if Fluent::Engine.now <= stored[:saved_at] + count_interval
          @counts = stored[:counts]
          @saved_at = stored[:saved_at]
          @saved_duration = stored[:saved_duration]

          # skip the saved duration to continue counting
          @last_checked = Fluent::Engine.now - @saved_duration
        else
          log.warn "out_datacounter: stored data is outdated. ignore stored data"
        end
      else
        log.warn "out_datacounter: configuration param was changed. ignore stored data"
      end
    end
  rescue => e
    log.warn "out_datacounter: Can't load store_file #{e.class} #{e.message}"
  end
end

#parse_num(str) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 41

def parse_num(str)
  if str.nil?
    nil
  elsif str =~ /^[-0-9]+$/
    str.to_i
  elsif str =~ /^[-.0-9]+$/
    str.to_f
  else
    Fluent::Config.size_value(str)
  end
end

#save_status(file_path) ⇒ Object

Store internal status into a file

Parameters:

  • file_path (String)


278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 278

def save_status(file_path)
  begin
    Pathname.new(file_path).open('wb') do |f|
      @saved_at = Fluent::Engine.now
      @saved_duration = @saved_at - @last_checked
      Marshal.dump({
        :counts           => @counts,
        :saved_at         => @saved_at,
        :saved_duration   => @saved_duration,
        :aggregate        => @aggregate,
        :count_key        => @count_key,
        :patterns         => @patterns,
      }, f)
    end
  rescue => e
    log.warn "out_datacounter: Can't write store_file #{e.class} #{e.message}"
  end
end

#shutdownObject



121
122
123
124
125
126
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 121

def shutdown
  super
  @watcher.terminate
  @watcher.join
  save_status(@store_file) if @store_file
end

#startObject



115
116
117
118
119
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 115

def start
  super
  load_status(@store_file, @count_interval) if @store_file
  start_watch
end

#start_watchObject



237
238
239
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 237

def start_watch
  @watcher = Thread.new(&method(:watch))
end

#stripped_tag(tag) ⇒ Object



159
160
161
162
163
164
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 159

def stripped_tag(tag)
  return tag unless @input_tag_remove_prefix
  return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
  return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
  tag
end

#watchObject



241
242
243
244
245
246
247
248
249
250
251
# File 'lib/fluent/plugin/out_numeric_counter.rb', line 241

def watch
  @last_checked ||= Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @count_interval
      now = Fluent::Engine.now
      flush_emit(now - @last_checked)
      @last_checked = now
    end
  end
end