Class: Fluent::Plugin::ConcatFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_concat.rb

Defined Under Namespace

Classes: TimeoutError

Instance Method Summary collapse

Constructor Details

#initializeConcatFilter

Returns a new instance of ConcatFilter.



33
34
35
36
37
38
# File 'lib/fluent/plugin/filter_concat.rb', line 33

def initialize
  super

  @buffer = Hash.new {|h, k| h[k] = [] }
  @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
end

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/filter_concat.rb', line 40

def configure(conf)
  super

  if @n_lines && (@multiline_start_regexp || @multiline_end_regexp || @continuous_line_regexp)
    raise Fluent::ConfigError, "n_lines and multiline_start_regexp/multiline_end_regexp/continuous_line_regexp are exclusive"
  end
  if @n_lines.nil? && @multiline_start_regexp.nil? && @multiline_end_regexp.nil?
    raise Fluent::ConfigError, "Either n_lines or multiline_start_regexp or multiline_end_regexp is required"
  end

  @mode = nil
  case
  when @n_lines
    @mode = :line
  when @multiline_start_regexp || @multiline_end_regexp
    @mode = :regexp
    if @multiline_start_regexp
      @multiline_start_regexp = Regexp.compile(@multiline_start_regexp[1..-2])
    end
    if @multiline_end_regexp
      @multiline_end_regexp = Regexp.compile(@multiline_end_regexp[1..-2])
    end
    if @continuous_line_regexp
      @continuous_line_regexp = Regexp.compile(@continuous_line_regexp[1..-2])
    end
  end
end

#filter_stream(tag, es) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/filter_concat.rb', line 80

def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
      new_es.add(time, record)
      next
    end
    unless record.key?(@key)
      new_es.add(time, record)
      next
    end
    begin
      flushed_es = process(tag, time, record)
      unless flushed_es.empty?
        flushed_es.each do |_time, new_record|
          time = _time if @use_first_timestamp
          new_es.add(time, record.merge(new_record))
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end
  new_es
end

#shutdownObject



74
75
76
77
78
# File 'lib/fluent/plugin/filter_concat.rb', line 74

def shutdown
  @finished = true
  flush_remaining_buffer
  super
end

#startObject



68
69
70
71
72
# File 'lib/fluent/plugin/filter_concat.rb', line 68

def start
  super
  @finished = false
  timer_execute(:filter_concat_timer, 1, &method(:on_timer))
end