Class: Fluent::ElasticsearchErrorHandler

Inherits:
Object
  • Object
show all
Includes:
ElasticsearchConstants
Defined in:
lib/fluent/plugin/elasticsearch_error_handler.rb

Defined Under Namespace

Classes: BulkIndexQueueFull, ElasticsearchError, ElasticsearchOutOfMemory, ElasticsearchVersionMismatch, UnrecognizedElasticsearchError

Constant Summary

Constants included from ElasticsearchConstants

Fluent::ElasticsearchConstants::BODY_DELIMITER, Fluent::ElasticsearchConstants::CREATE_OP, Fluent::ElasticsearchConstants::ID_FIELD, Fluent::ElasticsearchConstants::INDEX_OP, Fluent::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::ElasticsearchConstants::UPDATE_OP, Fluent::ElasticsearchConstants::UPSERT_OP

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plugin, records = 0, bulk_message_count = 0) ⇒ ElasticsearchErrorHandler

Returns a new instance of ElasticsearchErrorHandler.



12
13
14
15
16
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 12

def initialize(plugin, records = 0, bulk_message_count = 0)
  @plugin = plugin
  @records = records
  @bulk_message_count = bulk_message_count
end

Instance Attribute Details

#bulk_message_countObject

Returns the value of attribute bulk_message_count.



6
7
8
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 6

def bulk_message_count
  @bulk_message_count
end

#recordsObject

Returns the value of attribute records.



6
7
8
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 6

def records
  @records
end

Instance Method Details

#handle_error(response) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 18

def handle_error(response)
  stats = Hash.new(0)
  response['items'].each do |item|
    if item.has_key?(@plugin.write_operation)
      write_operation = @plugin.write_operation
    elsif INDEX_OP == @plugin.write_operation && item.has_key?(CREATE_OP)
      write_operation = CREATE_OP
    else
      # When we don't have an expected ops field, something changed in the API
      # expected return values (ES 2.x)
      stats[:errors_bad_resp] += 1
      next
    end
    if item[write_operation].has_key?('status')
      status = item[write_operation]['status']
    else
      # When we don't have a status field, something changed in the API
      # expected return values (ES 2.x)
      stats[:errors_bad_resp] += 1
      next
    end
    case
    when [200, 201].include?(status)
      stats[:successes] += 1
    when CREATE_OP == write_operation && 409 == status
      stats[:duplicates] += 1
    else
      if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
        type = item[write_operation]['error']['type']
      else
        # When we don't have a type field, something changed in the API
        # expected return values (ES 2.x)
        stats[:errors_bad_resp] += 1
        next
      end
      stats[type] += 1
    end
  end
  @plugin.log.on_debug do
    msg = ["Indexed (op = #{@plugin.write_operation})"]
    stats.each_pair { |key, value| msg << "#{value} #{key}" }
    @plugin.log.debug msg.join(', ')
  end
  case
  when stats[:errors_bad_resp] > 0
    @plugin.log.on_debug { @plugin.log.debug("Unable to parse response from elasticsearch, likely an API version mismatch:  #{response}") }
    raise ElasticsearchVersionMismatch, "Unable to parse error response from Elasticsearch, likely an API version mismatch. Add '@log_level debug' to your config to see the full response"
  when stats[:successes] + stats[:duplicates] == bulk_message_count
    @plugin.log.info("retry succeeded - successes=#{stats[:successes]} duplicates=#{stats[:duplicates]}")
  when stats['es_rejected_execution_exception'] > 0
    raise BulkIndexQueueFull, 'Bulk index queue is full, retrying'
  when stats['out_of_memory_error'] > 0
    raise ElasticsearchOutOfMemory, 'Elasticsearch has exhausted its heap, retrying'
  else
    @plugin.log.on_debug { @plugin.log.debug("Elasticsearch errors returned, retrying:  #{response}") }
    raise ElasticsearchError, "Elasticsearch returned errors, retrying. Add '@log_level debug' to your config to see the full response"
  end
end