Class: Fluent::Plugin::ElasticsearchErrorHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::ElasticsearchErrorHandler
- Includes:
- ElasticsearchConstants
- Defined in:
- lib/fluent/plugin/elasticsearch_error_handler.rb
Defined Under Namespace
Classes: ElasticsearchError, ElasticsearchSubmitMismatch, ElasticsearchVersionMismatch
Constant Summary
Constants included from ElasticsearchConstants
Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP
Instance Attribute Summary collapse
-
#bulk_message_count ⇒ Object
Returns the value of attribute bulk_message_count.
Instance Method Summary collapse
- #handle_error(response, tag, chunk, bulk_message_count, extracted_values) ⇒ Object
-
#initialize(plugin) ⇒ ElasticsearchErrorHandler
constructor
A new instance of ElasticsearchErrorHandler.
Constructor Details
#initialize(plugin) ⇒ ElasticsearchErrorHandler
Returns a new instance of ElasticsearchErrorHandler.
13 14 15 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 13 def initialize(plugin) @plugin = plugin end |
Instance Attribute Details
#bulk_message_count ⇒ Object
Returns the value of attribute bulk_message_count.
8 9 10 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 8 def @bulk_message_count end |
Instance Method Details
#handle_error(response, tag, chunk, bulk_message_count, extracted_values) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 17 def handle_error(response, tag, chunk, , extracted_values) items = response['items'] if items.nil? || !items.is_a?(Array) raise ElasticsearchVersionMismatch, "The response format was unrecognized: #{response}" end if != items.length raise ElasticsearchSubmitMismatch, "The number of records submitted #{} do not match the number returned #{items.length}. Unable to process bulk response." end retry_stream = Fluent::MultiEventStream.new stats = Hash.new(0) = {} header = {} chunk.msgpack_each do |time, rawrecord| = '' next unless rawrecord.is_a? Hash begin # we need a deep copy for process_message to alter processrecord = Marshal.load(Marshal.dump(rawrecord)) , header, record = @plugin.(tag, , header, time, processrecord, extracted_values) next unless @plugin.(@plugin.write_operation, , header, record, ) rescue => e stats[:bad_chunk_record] += 1 next end item = items.shift if item.has_key?(@plugin.write_operation) write_operation = @plugin.write_operation elsif INDEX_OP == @plugin.write_operation && item.has_key?(CREATE_OP) write_operation = CREATE_OP else # When we don't have an expected ops field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 next end if item[write_operation].has_key?('status') status = item[write_operation]['status'] else # When we don't have a status field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 next end case when [200, 201].include?(status) stats[:successes] += 1 when CREATE_OP == write_operation && 409 == status stats[:duplicates] += 1 when 400 == status stats[:bad_argument] += 1 @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new('400 - Rejected by Elasticsearch')) else if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type') type = item[write_operation]['error']['type'] stats[type] += 1 retry_stream.add(time, rawrecord) else # When we don't have a type field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - No error type provided in the response")) next end stats[type] += 1 end end @plugin.log.on_debug do msg = ["Indexed (op = #{@plugin.write_operation})"] stats.each_pair { |key, value| msg << "#{value} #{key}" } @plugin.log.debug msg.join(', ') end raise Fluent::Plugin::ElasticsearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty? end |