272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 272
def run_slice(slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new
result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']
while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end
router.emit_stream(@tag, es)
if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("7.0.0")
client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
else
client.clear_scroll(scroll_id: scroll_id) if scroll_id
end
end
|