Class: Anschel::Output::Elasticsearch
- Defined in:
- lib/anschel/output/elasticsearch.rb
Instance Method Summary collapse
-
#initialize(config, stats, log) ⇒ Elasticsearch
constructor
A new instance of Elasticsearch.
Methods inherited from Base
Constructor Details
#initialize(config, stats, log) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/anschel/output/elasticsearch.rb', line 13 def initialize config, stats, log default_index = config.delete(:default_index) || '.anschel' qsize = config.delete(:queue_size) || 2000 bsize = config.delete(:bulk_size) || 500 timeout = config.delete(:bulk_timeout) || 2.0 slice = timeout / bsize client = ::Elasticsearch::Client.new config client.transport.reload_connections! @queue = SizedQueue.new qsize @thread = Thread.new do loop do events = [] count = 0 start = Time.now.to_f until (Time.now.to_f - start > timeout) || (count > bsize) begin events.push @queue.shift(true) count += 1 rescue # shift returned immediately sleep slice end end next if events.empty? body = events.map do |e| id = e.delete(:@id) type = e.delete(:@type) || e[:type] index = e.delete(:@index) routing = e.delete(:@routing) if index.nil? log_event = { event: 'elasticsearch-output-error', reason: 'event was not indexed', remediation: "sending to default index '#{default_index}'" } log_event[:raw_event] = event if log.debug? log.error log_event index = default_index end item = { _index: index, _type: type, data: e } item[:_routing] = routing if routing item[:_id] = id if id { index: item } end response = client.bulk body: body if response['errors'] log.error \ event: 'elasticsearch-output-error', reason: 'response contained errors', body_size: body.size, response_size: response['items'].size end stats.inc 'output', body.size end end end |