Class: Fluent::Plugin::ReckonerCdpOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_reckoner_cdp.rb

Constant Summary collapse

MAXIMUM_QUERY_STRING_LENGTH =
10000

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



18
19
20
21
22
23
# File 'lib/fluent/plugin/out_reckoner_cdp.rb', line 18

def configure(conf)
  super
  @token = conf['token']
  @endpoint = conf['endpoint']
  @client_id = conf['client_id']
end

#startObject



25
26
27
28
# File 'lib/fluent/plugin/out_reckoner_cdp.rb', line 25

def start
  super
  @client = Reckoner::Cdp::Client.new(token, endpoint: @endpoint, client_id: @client_id)
end

#write(chunk) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_reckoner_cdp.rb', line 30

def write(chunk)
  data = []
  chunk.each do |_, record|
    data << record
    if data.join.length >= MAXIMUM_QUERY_STRING_LENGTH
      data.delete_at(-1)
      @client.streaming_insert(@workflow_id, data)
      data = [record]
    end
  end
  @client.streaming_insert(@workflow_id, data)
end