Class: Fluent::BufferedFilterOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::BufferedFilterOutput
- Defined in:
- lib/fluent/plugin/out_buffered_filter.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 11 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.respond_to?(:call) raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}" end end |
#format(tag, time, record) ⇒ Object
29 30 31 |
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 29 def format(tag, time, record) [tag, time, record].to_msgpack end |
#write(chunk) ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/out_buffered_filter.rb', line 33 def write(chunk) chunk.msgpack_each do |tag, time, record| records = @filter.call(tag, time, record) records = [records] unless records.kind_of?(Array) emit_records(tag, time, records) end end |