Class: Fluent::GrepCountFilterOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_grepcount_filter.rb

Constant Summary collapse

REGEXP_MAX_NUM =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#countObject

Returns the value of attribute count.



16
17
18
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 16

def count
  @count
end

#intervalObject

Returns the value of attribute interval.



14
15
16
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 14

def interval
  @interval
end

#last_checkedObject

Returns the value of attribute last_checked.



17
18
19
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 17

def last_checked
  @last_checked
end

#last_countObject

Returns the value of attribute last_count.



18
19
20
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 18

def last_count
  @last_count
end

#last_recordObject

Returns the value of attribute last_record.



20
21
22
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 20

def last_record
  @last_record
end

#last_tagObject

Returns the value of attribute last_tag.



19
20
21
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 19

def last_tag
  @last_tag
end

#watcherObject

Returns the value of attribute watcher.



15
16
17
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 15

def watcher
  @watcher
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 22

def configure(conf)
  super

  @interval = @count_interval.to_i
  @count = @last_count = 0

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["regexp#{i}"]
    key, regexp = conf["regexp#{i}"].split(/ /, 2)
    raise ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp
    raise ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key]
    @regexps[key] = Regexp.compile(regexp)
  end

  @excludes = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["exclude#{i}"]
    key, exclude = conf["exclude#{i}"].split(/ /, 2)
    raise ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude
    raise ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key]
    @excludes[key] = Regexp.compile(exclude)
  end
end

#emit(tag, es, chain) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 87

def emit(tag, es, chain)
  matched = false
  es.each { |time, record|

    # grep filtering
    catch(:break_loop) do
      @regexps.each do |key, regexp|
        throw :break_loop unless match(regexp, record[key].to_s)
      end
      @excludes.each do |key, exclude|
        throw :break_loop if match(exclude, record[key].to_s)
      end
      matched = true
    end

    if !matched
      next
    end

    @count += 1
    @last_tag = tag
    @last_record = record
  }
  chain.next
end

#flush_emitObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 70

def flush_emit
  output = nil
  if @last_count >= @threshold
    output = @last_record
    if @output_setting
      placeholder_values = {
        "count"     => @last_count,
        "interval"  => @interval,
        "threshold" => @threshold
      }
      output = reform(@last_record, placeholder_values)
    end
  end
  router.emit(@last_tag, @last_checked, output)
  @count = @last_count = 0
end

#match(regexp, string) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 113

def match(regexp, string)
  begin
    return regexp.match(string)
  rescue ArgumentError => e
    raise e unless e.message.index("invalid byte sequence in") == 0
    string = replace_invalid_byte(string)
    retry
  end
  return true
end

#reform(record, placeholder_values) ⇒ Object



124
125
126
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 124

def reform(record, placeholder_values)
  record.merge!(placeholder_values)
end

#shutdownObject



52
53
54
55
56
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 52

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



47
48
49
50
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 47

def start
  super
  @watcher = Thread.new(&method(:watch))
end

#watchObject



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 58

def watch
  @last_checked ||= Engine.now
  while true
    sleep 0.5
    if Engine.now - @last_checked >= @interval
      @last_checked = Engine.now
      @last_count = @count
      flush_emit
    end
  end
end