Class: Fluent::Plugin::IndicativeOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_indicative.rb

Instance Method Summary collapse

Instance Method Details

#process(tag, es) ⇒ Object



30
31
32
33
34
# File 'lib/fluent/plugin/out_indicative.rb', line 30

def process(tag, es)
  es.each do |time, record|
    send_event(record)
  end
end

#send_event(data) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_indicative.rb', line 36

def send_event(data)
  uri = URI.parse(@api_url)

  headers = {'Content-Type' => 'application/json'}

  unique_id_key = @event_unique_id_keys.find {|k| data[k]}

  payload = {
    apiKey: @api_key,
    eventName: data[@event_name_key],
    eventUniqueId: unique_id_key && data[unique_id_key],
    properties: flatten_hash(data),
    eventTime: Date.parse(data[@event_time_key]).rfc3339
  }

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true
  request = Net::HTTP::Post.new(uri.request_uri, headers)
  request.body = payload.to_json
  response = http.request(request)

  if response.code != 200
      log.warn("Indicative responded with error: #{response.body} for #{payload.to_json}")
  end
end