Class: Fluent::GrepCountFilterOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::GrepCountFilterOutput
- Defined in:
- lib/fluent/plugin/out_grepcount_filter.rb
Constant Summary collapse
- REGEXP_MAX_NUM =
20
Instance Attribute Summary collapse
-
#count ⇒ Object
Returns the value of attribute count.
-
#interval ⇒ Object
Returns the value of attribute interval.
-
#last_checked ⇒ Object
Returns the value of attribute last_checked.
-
#last_count ⇒ Object
Returns the value of attribute last_count.
-
#last_record ⇒ Object
Returns the value of attribute last_record.
-
#last_tag ⇒ Object
Returns the value of attribute last_tag.
-
#watcher ⇒ Object
Returns the value of attribute watcher.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush_emit ⇒ Object
- #match(regexp, string) ⇒ Object
- #reform(record, placeholder_values) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #watch ⇒ Object
Instance Attribute Details
#count ⇒ Object
Returns the value of attribute count.
16 17 18 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 16 def count @count end |
#interval ⇒ Object
Returns the value of attribute interval.
14 15 16 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 14 def interval @interval end |
#last_checked ⇒ Object
Returns the value of attribute last_checked.
17 18 19 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 17 def last_checked @last_checked end |
#last_count ⇒ Object
Returns the value of attribute last_count.
18 19 20 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 18 def last_count @last_count end |
#last_record ⇒ Object
Returns the value of attribute last_record.
20 21 22 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 20 def last_record @last_record end |
#last_tag ⇒ Object
Returns the value of attribute last_tag.
19 20 21 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 19 def last_tag @last_tag end |
#watcher ⇒ Object
Returns the value of attribute watcher.
15 16 17 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 15 def watcher @watcher end |
Instance Method Details
#configure(conf) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 22 def configure(conf) super @interval = @count_interval.to_i @count = @last_count = 0 @regexps = {} (1..REGEXP_MAX_NUM).each do |i| next unless conf["regexp#{i}"] key, regexp = conf["regexp#{i}"].split(/ /, 2) raise ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp raise ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key] @regexps[key] = Regexp.compile(regexp) end @excludes = {} (1..REGEXP_MAX_NUM).each do |i| next unless conf["exclude#{i}"] key, exclude = conf["exclude#{i}"].split(/ /, 2) raise ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude raise ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key] @excludes[key] = Regexp.compile(exclude) end end |
#emit(tag, es, chain) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 87 def emit(tag, es, chain) matched = false es.each { |time, record| # grep filtering catch(:break_loop) do @regexps.each do |key, regexp| throw :break_loop unless match(regexp, record[key].to_s) end @excludes.each do |key, exclude| throw :break_loop if match(exclude, record[key].to_s) end matched = true end if !matched next end @count += 1 @last_tag = tag @last_record = record } chain.next end |
#flush_emit ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 70 def flush_emit output = nil if @last_count >= @threshold output = @last_record if @output_setting placeholder_values = { "count" => @last_count, "interval" => @interval, "threshold" => @threshold } output = reform(@last_record, placeholder_values) end end router.emit(@last_tag, @last_checked, output) @count = @last_count = 0 end |
#match(regexp, string) ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 113 def match(regexp, string) begin return regexp.match(string) rescue ArgumentError => e raise e unless e..index("invalid byte sequence in") == 0 string = replace_invalid_byte(string) retry end return true end |
#reform(record, placeholder_values) ⇒ Object
124 125 126 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 124 def reform(record, placeholder_values) record.merge!(placeholder_values) end |
#shutdown ⇒ Object
52 53 54 55 56 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 52 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
47 48 49 50 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 47 def start super @watcher = Thread.new(&method(:watch)) end |
#watch ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_grepcount_filter.rb', line 58 def watch @last_checked ||= Engine.now while true sleep 0.5 if Engine.now - @last_checked >= @interval @last_checked = Engine.now @last_count = @count flush_emit end end end |