Class: Anschel::Output::Elasticsearch

Inherits:
Base
  • Object
show all
Defined in:
lib/anschel/output/elasticsearch.rb

Instance Method Summary collapse

Methods inherited from Base

#push, #stop

Constructor Details

#initialize(config, stats, log) ⇒ Elasticsearch

Returns a new instance of Elasticsearch.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/anschel/output/elasticsearch.rb', line 13

def initialize config, stats, log
  default_index = config.delete(:default_index) || '.anschel'
  qsize   = config.delete(:queue_size) || 2000
  bsize   = config.delete(:bulk_size) || 500
  timeout = config.delete(:bulk_timeout) || 2.0
  slice   = timeout / bsize
  client  = ::Elasticsearch::Client.new config
  client.transport.reload_connections!

  @queue = SizedQueue.new qsize

  @thread = Thread.new do
    loop do
      events = []
      count  = 0
      start  = Time.now.to_f
      until (Time.now.to_f - start > timeout) || (count > bsize)
        begin
          events.push @queue.shift(true)
          count += 1
        rescue # shift returned immediately
          sleep slice
        end
      end

      next if events.empty?

      body = events.map do |e|
        id = e.delete(:@id)
        type = e.delete(:@type) || e[:type]
        index = e.delete(:@index)
        routing = e.delete(:@routing)

        if index.nil?
          log_event = {
            event: 'elasticsearch-output-error',
            reason: 'event was not indexed',
            remediation: "sending to default index '#{default_index}'"
          }
          log_event[:raw_event] = event if log.debug?
          log.error log_event
          index = default_index
        end

        item = { _index: index, _type: type, data: e }
        item[:_routing] = routing if routing
        item[:_id] = id if id
        { index: item }
      end

      response = client.bulk body: body

      if response['errors']
        log.error \
          event: 'elasticsearch-output-error',
          reason: 'response contained errors',
          body_size: body.size,
          response_size: response['items'].size
      end

      stats.inc 'output', body.size
    end
  end
end