Class: Fluent::Plugin::IndicativeOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_indicative.rb

Instance Method Summary collapse

Instance Method Details

#process(tag, es) ⇒ Object



40
41
42
43
44
# File 'lib/fluent/plugin/out_indicative.rb', line 40

def process(tag, es)
  es.each_slice(@batch_size) do |events|
    send_events(events.map {|time, record| record})
  end
end

#send_events(events) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_indicative.rb', line 56

def send_events(events)
  uri = URI.parse(@api_url)

  headers = {'Content-Type' => 'application/json'}

  payload = {
    apiKey: @api_key,
    events: events.filter {|data| !@event_filter_key or data[@event_filter_key] != false}.map do |data|
      unique_id_key = @event_unique_id_keys.find {|k| data[k]}
      {
        eventName: data[@event_name_key],
        eventUniqueId: unique_id_key && data[unique_id_key],
        properties: flatten_hash(data),
        eventTime: DateTime.parse(data[@event_time_key]).rfc3339
      }
    end
  }

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true
  request = Net::HTTP::Post.new(uri.request_uri, headers)
  request.body = payload.to_json
  response = http.request(request)

  if response.code != "200"
      log.warn("Indicative responded with error (code: #{response.code}): #{payload.to_json} -> #{response.body}")
  end
end

#write(chunk) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_indicative.rb', line 46

def write(chunk)
  records = []
  chunk.each do |time, record|
    records << record
  end
  records.each_slice(@batch_size) do |events|
    send_events(events)
  end
end