Method: Fluent::Plugin::ElasticsearchInput#process_events

Defined in:
lib/fluent/plugin/in_elasticsearch.rb

#process_events(hit, es) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 306

def process_events(hit, es)
  event = hit["_source"]
  time = Fluent::Engine.now
  if @parse_timestamp
    if event.has_key?(TIMESTAMP_FIELD)
      rts = event[TIMESTAMP_FIELD]
      time = parse_time(rts, time, @tag)
    end
  end
  if @docinfo
    docinfo_target = event[@docinfo_target] || {}

    unless docinfo_target.is_a?(Hash)
      raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
    end

    @docinfo_fields.each do |field|
      docinfo_target[field] = hit[field]
    end

    event[@docinfo_target] = docinfo_target
  end
  es.add(time, event)
end