Class: Fluent::Plugin::ConcatFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_concat.rb

Defined Under Namespace

Classes: TimeoutError

Instance Method Summary collapse

Constructor Details

#initializeConcatFilter

Returns a new instance of ConcatFilter.



39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/filter_concat.rb', line 39

def initialize
  super

  @buffer = Hash.new {|h, k| h[k] = [] }
  @timeout_map_mutex = Thread::Mutex.new
  @timeout_map_mutex.synchronize do
    @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
  end
end

Instance Method Details

#configure(conf) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/filter_concat.rb', line 49

def configure(conf)
  super

  if @n_lines && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "n_lines and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @n_lines.nil? && @multiline_start_regexp.nil? && @multiline_end_regexp.nil? && @partial_key.nil?
    raise Fluent::ConfigError, "Either n_lines or multiline_start_regexp or multiline_end_regexp is required"
  end
  if @partial_key && @n_lines
    raise Fluent::ConfigError, "partial_key and n_lines are exclusive"
  end
  if @partial_key && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "partial_key and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @partial_key && @partial_value.nil?
    raise Fluent::ConfigError, "partial_value is required when partial_key is specified"
  end

  @mode = nil
  case
  when @n_lines
    @mode = :line
  when @partial_key
    @mode = :partial
  when @multiline_start_regexp || @multiline_end_regexp
    @mode = :regexp
    if @multiline_start_regexp
      @multiline_start_regexp = Regexp.compile(@multiline_start_regexp[1..-2])
    end
    if @multiline_end_regexp
      @multiline_end_regexp = Regexp.compile(@multiline_end_regexp[1..-2])
    end
    if @continuous_line_regexp
      @continuous_line_regexp = Regexp.compile(@continuous_line_regexp[1..-2])
    end
  end
end

#filter_stream(tag, es) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/plugin/filter_concat.rb', line 100

def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
      new_es.add(time, record)
      next
    end
    unless record.key?(@key)
      new_es.add(time, record)
      next
    end
    begin
      flushed_es = process(tag, time, record)
      unless flushed_es.empty?
        flushed_es.each do |_time, new_record|
          time = _time if @use_first_timestamp
          merged_record = record.merge(new_record)
          merged_record.delete(@partial_key) unless @keep_partial_key
          new_es.add(time, merged_record)
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end
  new_es
end

#shutdownObject



94
95
96
97
98
# File 'lib/fluent/plugin/filter_concat.rb', line 94

def shutdown
  @finished = true
  flush_remaining_buffer
  super
end

#startObject



88
89
90
91
92
# File 'lib/fluent/plugin/filter_concat.rb', line 88

def start
  super
  @finished = false
  timer_execute(:filter_concat_timer, 1, &method(:on_timer))
end