58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/fluent/plugin/out_copy.rb', line 58
def process(tag, es)
unless es.repeatable?
m = Fluent::MultiEventStream.new
es.each {|time,record|
m.add(time, record)
}
es = m
end
success = Array.new(outputs.size)
outputs.each_with_index do |output, i|
begin
if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i
else
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
success[i] = true
end
rescue => e
if @ignore_errors[i]
log.error "ignore emit error in #{output.plugin_id}", error: e
else
raise e
end
end
end
end
|