Class: Fluent::KinesisHelper::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/kinesis_helper/aggregator.rb

Defined Under Namespace

Modules: Mixin Classes: InvalidEncodingError

Constant Summary collapse

AggregatedRecord =
Google::Protobuf::DescriptorPool.generated_pool.lookup("AggregatedRecord").msgclass
Tag =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Tag").msgclass
Record =
Google::Protobuf::DescriptorPool.generated_pool.lookup("Record").msgclass
MagicNumber =
['F3899AC2'].pack('H*')

Instance Method Summary collapse

Instance Method Details

#aggregate(records, partition_key) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 47

def aggregate(records, partition_key)
  message = AggregatedRecord.encode(AggregatedRecord.new(
    partition_key_table: ['a', partition_key],
    records: records.map{|data|
      Record.new(partition_key_index: 1, data: data)
    },
  ))
  [MagicNumber, message, Digest::MD5.digest(message)].pack("A4A*A16")
end

#aggregated?(encoded) ⇒ Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 71

def aggregated?(encoded)
  encoded[0..3] == MagicNumber
end

#aggregated_size_offset(partition_key) ⇒ Object



75
76
77
78
79
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 75

def aggregated_size_offset(partition_key)
  data = 'd'
  encoded = aggregate([record(data)], partition_key)
  finalize(encoded).size - data.size
end

#deaggregate(encoded) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/kinesis_helper/aggregator.rb', line 57

def deaggregate(encoded)
  unless aggregated?(encoded)
    raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}"
  end
  message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1]
  if Digest::MD5.digest(message) != digest
    raise InvalidEncodingError, "Digest mismatch #{digest}"
  end
  decoded = AggregatedRecord.decode(message)
  records = decoded.records.map(&:data)
  partition_key = decoded.partition_key_table[1]
  [records, partition_key]
end