Class: Anschel::Output::Elasticsearch

Inherits:
Base
  • Object
show all
Defined in:
lib/anschel/output/elasticsearch.rb

Instance Method Summary collapse

Methods inherited from Base

#push, #stop

Constructor Details

#initialize(config, stats, log) ⇒ Elasticsearch

Returns a new instance of Elasticsearch.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/anschel/output/elasticsearch.rb', line 13

def initialize config, stats, log
  default_index = config.delete(:default_index) || '.anschel'
  qsize   = config.delete(:queue_size) || 2000
  bsize   = config.delete(:bulk_size) || 500
  timeout = config.delete(:bulk_timeout) || 2.0
  slice   = timeout / bsize
  client  = ::Elasticsearch::Client.new config
  client.transport.reload_connections!

  @queue = SizedQueue.new qsize

  @thread = Thread.new do
    loop do
      events = []
      count  = 0
      start  = Time.now.to_f
      until (Time.now.to_f - start > timeout) || (count > bsize)
        begin
          events.push @queue.shift(true)
          count += 1
        rescue # shift returned immediately
          sleep slice
        end
      end

      next if events.empty?

      body = events.map do |e|
        index = e.delete(:_index)
        routing = e.delete(:_routing)

        if index.nil?
          log.error \
            event: 'elasticsearch-output-error',
            reason: 'event was not indexed',
            remediation: "sending to default index '#{default_index}'",
            raw_event: e
          index = default_index
        end

        item = { _index: index, _type: e[:type], data: e }
        item[:_routing] = routing if routing
        { index: item }
      end

      response = client.bulk body: body

      if response['errors']
        log.error \
          event: 'elasticsearch-output-error',
          reason: 'response contained errors',
          body: body,
          response: response
      end

      stats.inc 'output', body.size
    end
  end
end