Class: Fluent::Plugin::ConcatFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::ConcatFilter
- Defined in:
- lib/fluent/plugin/filter_concat.rb
Defined Under Namespace
Classes: TimeoutError
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #filter_stream(tag, es) ⇒ Object
-
#initialize ⇒ ConcatFilter
constructor
A new instance of ConcatFilter.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ ConcatFilter
Returns a new instance of ConcatFilter.
33 34 35 36 37 38 |
# File 'lib/fluent/plugin/filter_concat.rb', line 33 def initialize super @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } end |
Instance Method Details
#configure(conf) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/filter_concat.rb', line 40 def configure(conf) super if @n_lines && (@multiline_start_regexp || @multiline_end_regexp || @continuous_line_regexp) raise Fluent::ConfigError, "n_lines and multiline_start_regexp/multiline_end_regexp/continuous_line_regexp are exclusive" end if @n_lines.nil? && @multiline_start_regexp.nil? && @multiline_end_regexp.nil? raise Fluent::ConfigError, "Either n_lines or multiline_start_regexp or multiline_end_regexp is required" end @mode = nil case when @n_lines @mode = :line when @multiline_start_regexp || @multiline_end_regexp @mode = :regexp if @multiline_start_regexp @multiline_start_regexp = Regexp.compile(@multiline_start_regexp[1..-2]) end if @multiline_end_regexp @multiline_end_regexp = Regexp.compile(@multiline_end_regexp[1..-2]) end if @continuous_line_regexp @continuous_line_regexp = Regexp.compile(@continuous_line_regexp[1..-2]) end end end |
#filter_stream(tag, es) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/filter_concat.rb', line 80 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag new_es.add(time, record) next end unless record.key?(@key) new_es.add(time, record) next end begin flushed_es = process(tag, time, record) unless flushed_es.empty? flushed_es.each do |_time, new_record| time = _time if @use_first_timestamp new_es.add(time, record.merge(new_record)) end end rescue => e router.emit_error_event(tag, time, record, e) end end new_es end |
#shutdown ⇒ Object
74 75 76 77 78 |
# File 'lib/fluent/plugin/filter_concat.rb', line 74 def shutdown @finished = true flush_remaining_buffer super end |
#start ⇒ Object
68 69 70 71 72 |
# File 'lib/fluent/plugin/filter_concat.rb', line 68 def start super @finished = false timer_execute(:filter_concat_timer, 1, &method(:on_timer)) end |