Class: FluentPluginKinesisAggregation::OutputFilter

Inherits:
Fluent::BufferedOutput
  • Object
show all
Includes:
Fluent::DetachMultiProcessMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_kinesis-aggregation.rb

Constant Summary collapse

USER_AGENT_NAME =
'fluent-plugin-kinesis-aggregation-output-filter'
PUT_RECORD_MAX_DATA_SIZE =
1024 * 1024
FLUENTD_MAX_BUFFER_SIZE =

200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn’t really ever have the buffer this high, since you’re likely to fail the write if anyone else is writing to the shard at the time.

PUT_RECORD_MAX_DATA_SIZE - 200

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 88

def configure(conf)
  super

  if @buffer.chunk_limit > FLUENTD_MAX_BUFFER_SIZE
    raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!"
  end

  if @buffer.chunk_limit > FLUENTD_MAX_BUFFER_SIZE / 3
    log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.'
  end
end

#format(tag, time, record) ⇒ Object



107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 107

def format(tag, time, record)
  return AggregatedRecord.new(
    records: [Record.new(
      partition_key_index: 0,
      data: Yajl.dump(record)
    )]
  ).encode
end

#startObject



100
101
102
103
104
105
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 100

def start
  detach_multi_process do
    super
    load_client
  end
end

#write(chunk) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 116

def write(chunk)
  records = chunk.read
  if records.length > FLUENTD_MAX_BUFFER_SIZE
    log.error "Can't emit aggregated record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})"
    return # do not throw, since we can't retry
  end

  partition_key = @fixed_partition_key || SecureRandom.uuid

  # confusing magic. Because of the format of protobuf records,
  # it's valid (in this case) to concatenate the AggregatedRecords
  # to form one AggregatedRecord, since we only have a repeated field
  # in records.
  message = AggregatedRecord.new(
    partition_key_table: [partition_key]
  ).encode + records

  @client.put_record(
    stream_name: @stream_name,
    data: kpl_aggregation_pack(message),
    partition_key: partition_key
  )
end