Class: Fluent::KinesisHelper::Aggregator
- Inherits:
-
Object
- Object
- Fluent::KinesisHelper::Aggregator
show all
- Defined in:
- lib/fluent/plugin/kinesis_helper/aggregator.rb
Defined Under Namespace
Modules: Mixin
Classes: InvalidEncodingError
Constant Summary
collapse
- AggregatedRecord =
Google::Protobuf::DescriptorPool.generated_pool.lookup("AggregatedRecord").msgclass
- Tag =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Tag").msgclass
- Record =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Record").msgclass
- MagicNumber =
['F3899AC2'].pack('H*')
Instance Method Summary
collapse
Instance Method Details
#aggregate(records, partition_key) ⇒ Object
47
48
49
50
51
52
53
54
55
|
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 47
def aggregate(records, partition_key)
message = AggregatedRecord.encode(AggregatedRecord.new(
partition_key_table: ['a', partition_key],
records: records.map{|data|
Record.new(partition_key_index: 1, data: data)
},
))
[MagicNumber, message, Digest::MD5.digest(message)].pack("A4A*A16")
end
|
#aggregated?(encoded) ⇒ Boolean
71
72
73
|
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 71
def aggregated?(encoded)
encoded[0..3] == MagicNumber
end
|
#aggregated_size_offset(partition_key) ⇒ Object
75
76
77
78
79
|
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 75
def aggregated_size_offset(partition_key)
data = 'd'
encoded = aggregate([record(data)], partition_key)
finalize(encoded).size - data.size
end
|
#deaggregate(encoded) ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 57
def deaggregate(encoded)
unless aggregated?(encoded)
raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}"
end
message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1]
if Digest::MD5.digest(message) != digest
raise InvalidEncodingError, "Digest mismatch #{digest}"
end
decoded = AggregatedRecord.decode(message)
records = decoded.records.map(&:data)
partition_key = decoded.partition_key_table[1]
[records, partition_key]
end
|