Class: Fluent::TimeSlicedFilterOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_time_sliced_filter.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 16

def configure(conf)
  super

  unless File.exist?(@filter_path)
    raise Fluent::ConfigError, "No such file: #{@filter_path}"
  end

  begin
    @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path)
  rescue => e
    raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}"
  end

  unless @filter.respond_to?(:call)
    raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}"
  end
end

#format(tag, time, record) ⇒ Object



34
35
36
37
38
39
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 34

def format(tag, time, record)
  # XXX: format_stream is not called
  # https://github.com/fluent/fluentd/blob/v0.10.43/lib/fluent/output.rb#L516
  filter_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#write(chunk) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/out_time_sliced_filter.rb', line 41

def write(chunk)
  enum = chunk.to_enum(:msgpack_each)
  rows = @pass_hash_row ? enum.map {|_, _, record| record } : enum.to_a
  records = @filter.call(rows)
  records = [records] unless records.kind_of?(Array)
  time = enum.map {|_, time, _| time }.min

  if @emit_each_tag
    tags = enum.map {|tag, _, _| tag }
    tags.each {|tag| emit_records(tag, time, records) }
  else
    tag = enum.first[0]
    emit_records(tag, time, records)
  end
end