34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 34
def handle_error(response, tag, chunk, bulk_message_count, )
items = response['items']
if items.nil? || !items.is_a?(Array)
raise ElasticsearchVersionMismatch, "The response format was unrecognized: #{response}"
end
if bulk_message_count != items.length
raise ElasticsearchSubmitMismatch, "The number of records submitted #{bulk_message_count} do not match the number returned #{items.length}. Unable to process bulk response."
end
retry_stream = Fluent::MultiEventStream.new
stats = Hash.new(0)
meta = {}
= {}
chunk.msgpack_each do |time, rawrecord|
bulk_message = ''
next unless rawrecord.is_a? Hash
begin
processrecord = Marshal.load(Marshal.dump(rawrecord))
meta, , record = @plugin.process_message(tag, meta, , time, processrecord, )
next unless @plugin.append_record_to_messages(@plugin.write_operation, meta, , record, bulk_message)
rescue => e
stats[:bad_chunk_record] += 1
next
end
item = items.shift
if item.has_key?(@plugin.write_operation)
write_operation = @plugin.write_operation
elsif INDEX_OP == @plugin.write_operation && item.has_key?(CREATE_OP)
write_operation = CREATE_OP
else
stats[:errors_bad_resp] += 1
next
end
if item[write_operation].has_key?('status')
status = item[write_operation]['status']
else
stats[:errors_bad_resp] += 1
next
end
case
when [200, 201].include?(status)
stats[:successes] += 1
when CREATE_OP == write_operation && 409 == status
stats[:duplicates] += 1
when 400 == status
stats[:bad_argument] += 1
reason = ""
log_es_400_reason do
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
reason = " [error type]: #{item[write_operation]['error']['type']}"
end
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
end
end
@plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("400 - Rejected by Elasticsearch#{reason}"))
else
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
type = item[write_operation]['error']['type']
stats[type] += 1
retry_stream.add(time, rawrecord)
if unrecoverable_error?(type)
raise ElasticsearchRequestAbortError, "Rejected Elasticsearch due to #{type}"
end
else
stats[:errors_bad_resp] += 1
@plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - No error type provided in the response"))
next
end
stats[type] += 1
end
end
@plugin.log.on_debug do
msg = ["Indexed (op = #{@plugin.write_operation})"]
stats.each_pair { |key, value| msg << "#{value} #{key}" }
@plugin.log.debug msg.join(', ')
end
raise Fluent::Plugin::ElasticsearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
end
|