Method: Fluent::Plugin::ElasticsearchInput#run_slice

Defined in:
lib/fluent/plugin/in_elasticsearch.rb

#run_slice(slice_id = nil) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 272

def run_slice(slice_id=nil)
  slice_query = @base_query
  slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
  result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
  es = Fluent::MultiEventStream.new

  result["hits"]["hits"].each {|hit| process_events(hit, es)}
  has_hits = result['hits']['hits'].any?
  scroll_id = result['_scroll_id']

  while has_hits && scroll_id
    result = process_next_scroll_request(es, scroll_id)
    has_hits = result['has_hits']
    scroll_id = result['_scroll_id']
  end

  router.emit_stream(@tag, es)
  if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("7.0.0")
    client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
  else
    client.clear_scroll(scroll_id: scroll_id) if scroll_id
  end
end