306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 306
def process_events(hit, es)
event = hit["_source"]
time = Fluent::Engine.now
if @parse_timestamp
if event.has_key?(TIMESTAMP_FIELD)
rts = event[TIMESTAMP_FIELD]
time = parse_time(rts, time, @tag)
end
end
if @docinfo
docinfo_target = event[@docinfo_target] || {}
unless docinfo_target.is_a?(Hash)
raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
end
@docinfo_fields.each do |field|
docinfo_target[field] = hit[field]
end
event[@docinfo_target] = docinfo_target
end
es.add(time, event)
end
|