Class: Fluent::Plugin::ElasticsearchErrorHandler

Inherits:
Object
  • Object
show all
Includes:
ElasticsearchConstants
Defined in:
lib/fluent/plugin/elasticsearch_error_handler.rb

Defined Under Namespace

Classes: BulkIndexQueueFull, ElasticsearchError, ElasticsearchOutOfMemory, ElasticsearchVersionMismatch, UnrecognizedElasticsearchError

Constant Summary

Constants included from ElasticsearchConstants

Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plugin, records = 0, bulk_message_count = 0) ⇒ ElasticsearchErrorHandler

Returns a new instance of ElasticsearchErrorHandler.



12
13
14
15
16
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 12

def initialize(plugin, records = 0, bulk_message_count = 0)
  @plugin = plugin
  @records = records
  @bulk_message_count = bulk_message_count
end

Instance Attribute Details

#bulk_message_countObject

Returns the value of attribute bulk_message_count.



6
7
8
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 6

def bulk_message_count
  @bulk_message_count
end

#recordsObject

Returns the value of attribute records.



6
7
8
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 6

def records
  @records
end

Instance Method Details

#handle_error(response) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 18

def handle_error(response)
  errors = Hash.new(0)
  errors_bad_resp = 0
  errors_unrecognized = 0
  successes = 0
  duplicates = 0
  bad_arguments = 0
  response['items'].each do |item|
    if item.has_key?(@plugin.write_operation)
      write_operation = @plugin.write_operation
    elsif INDEX_OP == @plugin.write_operation && item.has_key?(CREATE_OP)
      write_operation = CREATE_OP
    else
      # When we don't have an expected ops field, something changed in the API
      # expected return values (ES 2.x)
      errors_bad_resp += 1
      next
    end
    if item[write_operation].has_key?('status')
      status = item[write_operation]['status']
    else
      # When we don't have a status field, something changed in the API
      # expected return values (ES 2.x)
      errors_bad_resp += 1
      next
    end
    case
    when CREATE_OP == write_operation && 409 == status
      duplicates += 1
    when 400 == status
      bad_arguments += 1
      @plugin.log.debug "Elasticsearch rejected document: #{item}"
    when [429, 500].include?(status)
      if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
        type = item[write_operation]['error']['type']
      else
        # When we don't have a type field, something changed in the API
        # expected return values (ES 2.x)
        errors_bad_resp += 1
        next
      end
      errors[type] += 1
    when [200, 201].include?(status)
      successes += 1
    else
      errors_unrecognized += 1
    end
  end
  if errors_bad_resp > 0
    msg = "Unable to parse error response from Elasticsearch, likely an API version mismatch  #{response}"
    @plugin.log.error msg
    raise ElasticsearchVersionMismatch, msg
  end
  if bad_arguments > 0
    @plugin.log.warn "Elasticsearch rejected #{bad_arguments} documents due to invalid field arguments"
  end
  if duplicates > 0
    @plugin.log.info "Encountered #{duplicates} duplicate(s) of #{successes} indexing chunk, ignoring"
  end
  msg = "Indexed (op = #{@plugin.write_operation}) #{successes} successfully, #{duplicates} duplicate(s), #{bad_arguments} bad argument(s), #{errors_unrecognized} unrecognized error(s)"
  errors.each_key do |key|
    msg << ", #{errors[key]} #{key} error(s)"
  end
  @plugin.log.debug msg
  if errors_unrecognized > 0
    raise UnrecognizedElasticsearchError, "Unrecognized elasticsearch errors returned, retrying  #{response}"
  end
  errors.each_key do |key|
    case key
    when 'out_of_memory_error'
      raise ElasticsearchOutOfMemory, "Elasticsearch has exhausted its heap, retrying"
    when 'es_rejected_execution_exception'
      raise BulkIndexQueueFull, "Bulk index queue is full, retrying"
    else
      raise ElasticsearchError, "Elasticsearch errors returned, retrying  #{response}"
    end
  end
end