Class: FluentPluginKinesisAggregation::OutputFilter
- Inherits:
-
Fluent::BufferedOutput
- Object
- Fluent::BufferedOutput
- FluentPluginKinesisAggregation::OutputFilter
- Includes:
- Fluent::DetachMultiProcessMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_kinesis-aggregation.rb
Constant Summary collapse
- USER_AGENT_NAME =
'fluent-plugin-kinesis-aggregation-output-filter'
- PUT_RECORD_MAX_DATA_SIZE =
1024 * 1024
- FLUENTD_MAX_BUFFER_SIZE =
200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn’t really ever have the buffer this high, since you’re likely to fail the write if anyone else is writing to the shard at the time.
PUT_RECORD_MAX_DATA_SIZE - 200
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 88 def configure(conf) super if @buffer.chunk_limit > FLUENTD_MAX_BUFFER_SIZE raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!" end if @buffer.chunk_limit > FLUENTD_MAX_BUFFER_SIZE / 3 log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.' end end |
#format(tag, time, record) ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 107 def format(tag, time, record) return AggregatedRecord.new( records: [Record.new( partition_key_index: 0, data: Yajl.dump(record) )] ).encode end |
#start ⇒ Object
100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 100 def start detach_multi_process do super load_client end end |
#write(chunk) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 116 def write(chunk) records = chunk.read if records.length > FLUENTD_MAX_BUFFER_SIZE log.error "Can't emit aggregated record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})" return # do not throw, since we can't retry end partition_key = @fixed_partition_key || SecureRandom.uuid # confusing magic. Because of the format of protobuf records, # it's valid (in this case) to concatenate the AggregatedRecords # to form one AggregatedRecord, since we only have a repeated field # in records. = AggregatedRecord.new( partition_key_table: [partition_key] ).encode + records @client.put_record( stream_name: @stream_name, data: kpl_aggregation_pack(), partition_key: partition_key ) end |