Class: Kafka::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/message.rb

Overview

A message. The format of a message is as follows:

4 byte big-endian int: length of message in bytes (including the rest of

the header, but excluding the length field itself)

1 byte: “magic” identifier (format version number)

If the magic byte == 0, there is one more header field:

4 byte big-endian int: CRC32 checksum of the payload

If the magic byte == 1, there are two more header fields:

1 byte: “attributes” (flags for compression, codec etc) 4 byte big-endian int: CRC32 checksum of the payload

All following bytes are the payload.

Constant Summary collapse

MAGIC_IDENTIFIER_DEFAULT =
0
BASIC_MESSAGE_HEADER =
'NC'.freeze
VERSION_0_HEADER =
'N'.freeze
VERSION_1_HEADER =
'CN'.freeze
COMPRESSION_CODEC_MASK =
0x03

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message

Returns a new instance of Message.



43
44
45
46
47
# File 'lib/kafka/message.rb', line 43

def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
  self.magic    = magic
  self.payload  = payload || ""
  self.checksum = checksum || self.calculate_checksum
end

Instance Attribute Details

#checksumObject

Returns the value of attribute checksum.



41
42
43
# File 'lib/kafka/message.rb', line 41

def checksum
  @checksum
end

#magicObject

Returns the value of attribute magic.



41
42
43
# File 'lib/kafka/message.rb', line 41

def magic
  @magic
end

#payloadObject

Returns the value of attribute payload.



41
42
43
# File 'lib/kafka/message.rb', line 41

def payload
  @payload
end

Class Method Details

.parse_from(data) ⇒ Object

Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/kafka/message.rb', line 60

def self.parse_from(data)
  messages = []
  bytes_processed = 0

  while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
    message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
    break if bytes_processed + message_size + 4 > data.length # message is truncated

    case magic
    when 0
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9      ...
      # |                       |     |                       |
      # |      message_size     |magic|        checksum       | payload ...
      payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
      checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
      payload  = data[bytes_processed + 9, payload_size]
      messages << Kafka::Message.new(payload, magic, checksum)

    when 1
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  | 10      ...
      # |                       |     |     |                       |
      # |         size          |magic|attrs|        checksum       | payload ...
      payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
      attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
      payload = data[bytes_processed + 10, payload_size]

      case attributes & COMPRESSION_CODEC_MASK
      when 0 # a single uncompressed message
        messages << Kafka::Message.new(payload, magic, checksum)
      when 1 # a gzip-compressed message set -- parse recursively
        uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
        message_set = parse_from(uncompressed)
        raise 'malformed compressed message' if message_set.size != uncompressed.size
        messages.concat(message_set.messages)
      else
        # https://cwiki.apache.org/confluence/display/KAFKA/Compression
        # claims that 2 is for Snappy compression, but Kafka's Scala client
        # implementation doesn't seem to support it yet, so I don't have
        # a reference implementation to test against.
        raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
      end

    else
      raise "Unsupported Kafka message version: magic number #{magic}"
    end

    bytes_processed += message_size + 4 # 4 = sizeof(message_size)
  end

  MessageSet.new(bytes_processed, messages)
end

Instance Method Details

#calculate_checksumObject



49
50
51
# File 'lib/kafka/message.rb', line 49

def calculate_checksum
  Zlib.crc32(self.payload)
end

#valid?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/kafka/message.rb', line 53

def valid?
  self.checksum == calculate_checksum
end