Class: Fluent::Plugin::KinesisHelper::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/kinesis_helper/aggregator.rb

Defined Under Namespace

Modules: Mixin Classes: InvalidEncodingError

Constant Summary collapse

AggregatedRecord =
Google::Protobuf::DescriptorPool.generated_pool.lookup("AggregatedRecord").msgclass
Tag =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Tag").msgclass
Record =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Record").msgclass
MagicNumber =
['F3899AC2'].pack('H*')

Instance Method Summary collapse

Instance Method Details

#aggregate(records, partition_key) ⇒ Object



31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 31

def aggregate(records, partition_key)
  message = AggregatedRecord.encode(AggregatedRecord.new(
    partition_key_table: ['a', partition_key],
    records: records.map{|data|
      Record.new(partition_key_index: 1, data: data)
    },
  ))
  [MagicNumber, message, Digest::MD5.digest(message)].pack("A4A*A16")
end

#aggregated?(encoded) ⇒ Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 55

def aggregated?(encoded)
  encoded[0..3] == MagicNumber
end

#aggregated_size_offset(partition_key) ⇒ Object



59
60
61
62
63
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 59

def aggregated_size_offset(partition_key)
  data = 'd'
  encoded = aggregate([record(data)], partition_key)
  finalize(encoded).size - data.size
end

#deaggregate(encoded) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 41

def deaggregate(encoded)
  unless aggregated?(encoded)
    raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}"
  end
  message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1]
  if Digest::MD5.digest(message) != digest
    raise InvalidEncodingError, "Digest mismatch #{digest}"
  end
  decoded = AggregatedRecord.decode(message)
  records = decoded.records.map(&:data)
  partition_key = decoded.partition_key_table[1]
  [records, partition_key]
end