Class: Fluent::Plugin::KinesisStreamsAggregatedOutput

Inherits:
KinesisOutput
  • Object
show all
Includes:
Fluent::Plugin::KinesisHelper::API::BatchRequest, Fluent::Plugin::KinesisHelper::Aggregator::Mixin
Defined in:
lib/fluent/plugin/out_kinesis_streams_aggregated.rb

Constant Summary collapse

RequestType =
:streams_aggregated
BatchRequestLimitCount =
100_000
BatchRequestLimitSize =
1024 * 1024

Constants included from Fluent::Plugin::KinesisHelper::Aggregator::Mixin

Fluent::Plugin::KinesisHelper::Aggregator::Mixin::AggregateOffset, Fluent::Plugin::KinesisHelper::Aggregator::Mixin::RecordOffset

Constants included from Fluent::Plugin::KinesisHelper::API

Fluent::Plugin::KinesisHelper::API::MaxRecordSize

Instance Method Summary collapse

Methods included from Fluent::Plugin::KinesisHelper::API::BatchRequest

included

Methods included from Fluent::Plugin::KinesisHelper::Aggregator::Mixin

#aggregator, included

Methods inherited from KinesisOutput

#multi_workers_ready?

Methods included from Fluent::Plugin::KinesisHelper::API

included

Methods included from Fluent::Plugin::KinesisHelper::Client

#client, included

Instance Method Details

#configure(conf) ⇒ Object



32
33
34
35
36
37
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 32

def configure(conf)
  super
  @partition_key_generator = create_partition_key_generator
  @batch_request_max_size -= offset
  @max_record_size -= offset
end

#format(tag, time, record) ⇒ Object



39
40
41
42
43
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 39

def format(tag, time, record)
  format_for_api do
    [@data_formatter.call(tag, time, record)]
  end
end

#offsetObject



60
61
62
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 60

def offset
  @offset ||= AggregateOffset + @partition_key_generator.call.size*2
end

#write(chunk) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 45

def write(chunk)
  stream_name = extract_placeholders(@stream_name, chunk)
  write_records_batch(chunk) do |batch|
    key = @partition_key_generator.call
    records = batch.map{|(data)|data}
    client.put_records(
      stream_name: stream_name,
      records: [{
        partition_key: key,
        data: aggregator.aggregate(records, key),
      }],
    )
  end
end