Method: Fluent::Plugin::Output#handle_stream_with_standard_format

Defined in:
lib/fluent/plugin/output.rb

#handle_stream_with_standard_format(tag, es, enqueue: false) ⇒ Object



1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/fluent/plugin/output.rb', line 1058

def handle_stream_with_standard_format(tag, es, enqueue: false)
  format_proc = generate_format_proc
  meta_and_data = {}
  records = 0
  es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
    meta = (tag, time, record)
    meta_and_data[meta] ||= MultiEventStream.new
    meta_and_data[meta].add(time, record)
    records += 1
  end
  write_guard do
    @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
  end
  @emit_records_metrics.add(es.size)
  @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
  true
end