Class: Fluent::BufferedFilterOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_buffered_filter.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 11

def configure(conf)
  super

  unless File.exist?(@filter_path)
    raise Fluent::ConfigError, "No such file: #{@filter_path}"
  end

  begin
    @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path)
  rescue => e
    raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}"
  end

  unless @filter.respond_to?(:call)
    raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}"
  end
end

#format(tag, time, record) ⇒ Object



29
30
31
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 29

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#write(chunk) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 33

def write(chunk)
  chunk.msgpack_each do |tag, time, record|
    records = @filter.call(tag, time, record)
    records = [records] unless records.kind_of?(Array)
    emit_records(tag, time, records)
  end
end