1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
|
# File 'lib/fluent/plugin/output.rb', line 1058
def handle_stream_with_standard_format(tag, es, enqueue: false)
format_proc = generate_format_proc
meta_and_data = {}
records = 0
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= MultiEventStream.new
meta_and_data[meta].add(time, record)
records += 1
end
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
true
end
|