Class: Fluent::Plugin::KinesisStreamsAggregatedOutput
Constant Summary
collapse
- RequestType =
:streams_aggregated
- BatchRequestLimitCount =
100_000
- BatchRequestLimitSize =
1024 * 1024
Fluent::Plugin::KinesisHelper::Aggregator::Mixin::AggregateOffset, Fluent::Plugin::KinesisHelper::Aggregator::Mixin::RecordOffset
Fluent::Plugin::KinesisHelper::API::MaxRecordSize
Instance Method Summary
collapse
included
#aggregator, included
#multi_workers_ready?
included
#client, included
Instance Method Details
32
33
34
35
36
37
|
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 32
def configure(conf)
super
@partition_key_generator = create_partition_key_generator
@batch_request_max_size -= offset
@max_record_size -= offset
end
|
39
40
41
42
43
|
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 39
def format(tag, time, record)
format_for_api do
[@data_formatter.call(tag, time, record)]
end
end
|
#offset ⇒ Object
60
61
62
|
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 60
def offset
@offset ||= AggregateOffset + @partition_key_generator.call.size*2
end
|
#write(chunk) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/fluent/plugin/out_kinesis_streams_aggregated.rb', line 45
def write(chunk)
stream_name = (@stream_name, chunk)
write_records_batch(chunk) do |batch|
key = @partition_key_generator.call
records = batch.map{|(data)|data}
client.put_records(
stream_name: stream_name,
records: [{
partition_key: key,
data: aggregator.aggregate(records, key),
}],
)
end
end
|