Class: Fluent::TimeSlicedFilterOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::TimeSlicedFilterOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_time_sliced_filter.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 16 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.respond_to?(:call) raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}" end end |
#format(tag, time, record) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 34 def format(tag, time, record) # XXX: format_stream is not called # https://github.com/fluent/fluentd/blob/v0.10.43/lib/fluent/output.rb#L516 filter_record(tag, time, record) [tag, time, record].to_msgpack end |
#write(chunk) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 41 def write(chunk) enum = chunk.to_enum(:msgpack_each) rows = @pass_hash_row ? enum.map {|_, _, record| record } : enum.to_a records = @filter.call(rows) records = [records] unless records.kind_of?(Array) time = enum.map {|_, time, _| time }.min if @emit_each_tag = enum.map {|tag, _, _| tag } .each {|tag| emit_records(tag, time, records) } else tag = enum.first[0] emit_records(tag, time, records) end end |