Class: Kafka::Message
- Inherits:
-
Object
- Object
- Kafka::Message
- Defined in:
- lib/kafka/message.rb
Overview
A message. The format of a message is as follows:
4 byte big-endian int: length of message in bytes (including the rest of
the header, but excluding the length field itself)
1 byte: “magic” identifier (format version number)
If the magic byte == 0, there is one more header field:
4 byte big-endian int: CRC32 checksum of the payload
If the magic byte == 1, there are two more header fields:
1 byte: “attributes” (flags for compression, codec etc) 4 byte big-endian int: CRC32 checksum of the payload
All following bytes are the payload.
Constant Summary collapse
- MAGIC_IDENTIFIER_DEFAULT =
0
- BASIC_MESSAGE_HEADER =
'NC'.freeze
- VERSION_0_HEADER =
'N'.freeze
- VERSION_1_HEADER =
'CN'.freeze
- COMPRESSION_CODEC_MASK =
0x03
Instance Attribute Summary collapse
-
#checksum ⇒ Object
Returns the value of attribute checksum.
-
#magic ⇒ Object
Returns the value of attribute magic.
-
#payload ⇒ Object
Returns the value of attribute payload.
Class Method Summary collapse
-
.parse_from(data) ⇒ Object
Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.
Instance Method Summary collapse
- #calculate_checksum ⇒ Object
-
#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message
constructor
A new instance of Message.
- #valid? ⇒ Boolean
Constructor Details
#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message
Returns a new instance of Message.
43 44 45 46 47 |
# File 'lib/kafka/message.rb', line 43 def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) self.magic = magic self.payload = payload || "" self.checksum = checksum || self.calculate_checksum end |
Instance Attribute Details
#checksum ⇒ Object
Returns the value of attribute checksum.
41 42 43 |
# File 'lib/kafka/message.rb', line 41 def checksum @checksum end |
#magic ⇒ Object
Returns the value of attribute magic.
41 42 43 |
# File 'lib/kafka/message.rb', line 41 def magic @magic end |
#payload ⇒ Object
Returns the value of attribute payload.
41 42 43 |
# File 'lib/kafka/message.rb', line 41 def payload @payload end |
Class Method Details
.parse_from(data) ⇒ Object
Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/kafka/message.rb', line 60 def self.parse_from(data) = [] bytes_processed = 0 while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER , magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER) break if bytes_processed + + 4 > data.length # message is truncated case magic when 0 # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ... # | | | | # | message_size |magic| checksum | payload ... payload_size = - 5 # 5 = sizeof(magic) + sizeof(checksum) checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift payload = data[bytes_processed + 9, payload_size] << Kafka::Message.new(payload, magic, checksum) when 1 # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ... # | | | | | # | size |magic|attrs| checksum | payload ... payload_size = - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum) attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER) payload = data[bytes_processed + 10, payload_size] case attributes & COMPRESSION_CODEC_MASK when 0 # a single uncompressed message << Kafka::Message.new(payload, magic, checksum) when 1 # a gzip-compressed message set -- parse recursively uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read = parse_from(uncompressed) raise 'malformed compressed message' if .size != uncompressed.size .concat(.) else # https://cwiki.apache.org/confluence/display/KAFKA/Compression # claims that 2 is for Snappy compression, but Kafka's Scala client # implementation doesn't seem to support it yet, so I don't have # a reference implementation to test against. raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}" end else raise "Unsupported Kafka message version: magic number #{magic}" end bytes_processed += + 4 # 4 = sizeof(message_size) end MessageSet.new(bytes_processed, ) end |
Instance Method Details
#calculate_checksum ⇒ Object
49 50 51 |
# File 'lib/kafka/message.rb', line 49 def calculate_checksum Zlib.crc32(self.payload) end |
#valid? ⇒ Boolean
53 54 55 |
# File 'lib/kafka/message.rb', line 53 def valid? self.checksum == calculate_checksum end |