Class: Fluent::Plugin::KinesisFirehoseOutput

Inherits:
KinesisOutput
  • Object
show all
Includes:
Fluent::Plugin::KinesisHelper::API::BatchRequest
Defined in:
lib/fluent/plugin/out_kinesis_firehose.rb

Constant Summary collapse

RequestType =
:firehose
BatchRequestLimitCount =
500
BatchRequestLimitSize =
4 * 1024 * 1024

Constants included from Fluent::Plugin::KinesisHelper::API

Fluent::Plugin::KinesisHelper::API::MaxRecordSize

Instance Method Summary collapse

Methods included from Fluent::Plugin::KinesisHelper::API::BatchRequest

included, #size_of_values

Methods inherited from KinesisOutput

#multi_workers_ready?

Methods included from Fluent::Plugin::KinesisHelper::API

included

Methods included from Fluent::Plugin::KinesisHelper::Client

#client, included

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 30

def configure(conf)
  super
  if @append_new_line
    org_data_formatter = @data_formatter
    @data_formatter = ->(tag, time, record) {
      org_data_formatter.call(tag, time, record).chomp + "\n"
    }
  end
end

#format(tag, time, record) ⇒ Object



40
41
42
43
44
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 40

def format(tag, time, record)
  format_for_api do
    [@data_formatter.call(tag, time, record)]
  end
end

#write(chunk) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 46

def write(chunk)
  write_records_batch(chunk) do |batch|
    records = batch.map{|(data)|
      { data: data }
    }
    client.put_record_batch(
      delivery_stream_name: @delivery_stream_name,
      records: records,
    )
  end
end