15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/fluent/plugin/filter_jsonl_array_splitter.rb', line 15
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
record_lines = record[@key_name].to_s
parsed = []
record_lines.each_line do |line|
if @debug then
log.debug("Read JSON line: #{line}")
end
parsed << JSON.parse("#{line}")
end
parsed.each do |r|
new_record = record.clone
new_record.update(r)
if !@reserve_key then
new_record.delete(@key_name)
end
if @debug then
log.debug("New record by json line filter: #{new_record}")
end
new_es.add(time, new_record)
end
end
new_es
end
|