Class: Fluent::Plugin::KinesisStreamsOutput
Constant Summary
collapse
- RequestType =
:streams
- BatchRequestLimitCount =
500
- BatchRequestLimitSize =
5 * 1024 * 1024
Fluent::Plugin::KinesisHelper::API::MaxRecordSize
Instance Method Summary
collapse
included, #size_of_values
#multi_workers_ready?
included
#client, included
Instance Method Details
30
31
32
33
|
# File 'lib/fluent/plugin/out_kinesis_streams.rb', line 30
def configure(conf)
super
@key_formatter = key_formatter_create
end
|
35
36
37
38
39
40
41
|
# File 'lib/fluent/plugin/out_kinesis_streams.rb', line 35
def format(tag, time, record)
format_for_api do
data = @data_formatter.call(tag, time, record)
key = @key_formatter.call(record)
[data, key]
end
end
|
#write(chunk) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/fluent/plugin/out_kinesis_streams.rb', line 43
def write(chunk)
stream_name = (@stream_name, chunk)
write_records_batch(chunk) do |batch|
records = batch.map{|(data, partition_key)|
{ data: data, partition_key: partition_key }
}
client.put_records(
stream_name: stream_name,
records: records,
)
end
end
|