Class: Kafka::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/message.rb

Overview

A message. The format of a message is as follows:

4 byte big-endian int: length of message in bytes (including the rest of

the header, but excluding the length field itself)

1 byte: “magic” identifier (format version number)

If the magic byte == 0, there is one more header field:

4 byte big-endian int: CRC32 checksum of the payload

If the magic byte == 1, there are two more header fields:

1 byte: “attributes” (flags for compression, codec etc) 4 byte big-endian int: CRC32 checksum of the payload

All following bytes are the payload.

Defined Under Namespace

Classes: MessageSet

Constant Summary collapse

MAGIC_IDENTIFIER_DEFAULT =
0
MAGIC_IDENTIFIER_COMPRESSION =
1
NO_COMPRESSION =
0
GZIP_COMPRESSION =
1
SNAPPY_COMPRESSION =
2
BASIC_MESSAGE_HEADER =
'NC'.freeze
VERSION_0_HEADER =
'N'.freeze
VERSION_1_HEADER =
'CN'.freeze
COMPRESSION_CODEC_MASK =
0x03

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message

Returns a new instance of Message.



47
48
49
50
51
52
# File 'lib/kafka/message.rb', line 47

def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
  self.magic       = magic
  self.payload     = payload || ""
  self.checksum    = checksum || self.calculate_checksum
  @compression = NO_COMPRESSION
end

Instance Attribute Details

#checksumObject

Returns the value of attribute checksum.



45
46
47
# File 'lib/kafka/message.rb', line 45

def checksum
  @checksum
end

#magicObject

Returns the value of attribute magic.



45
46
47
# File 'lib/kafka/message.rb', line 45

def magic
  @magic
end

#payloadObject

Returns the value of attribute payload.



45
46
47
# File 'lib/kafka/message.rb', line 45

def payload
  @payload
end

Class Method Details

.ensure_snappy!Object



137
138
139
140
141
142
143
# File 'lib/kafka/message.rb', line 137

def self.ensure_snappy!
  if Object.const_defined? "Snappy"
    yield
  else
    fail "Snappy not available!"
  end
end

.parse_from(data) ⇒ Object

Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/kafka/message.rb', line 65

def self.parse_from(data)
  messages = []
  bytes_processed = 0

  while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
    message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
    break if bytes_processed + message_size + 4 > data.length # message is truncated

    case magic
    when MAGIC_IDENTIFIER_DEFAULT
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9      ...
      # |                       |     |                       |
      # |      message_size     |magic|        checksum       | payload ...
      payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
      checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
      payload  = data[bytes_processed + 9, payload_size]
      messages << Kafka::Message.new(payload, magic, checksum)

    when MAGIC_IDENTIFIER_COMPRESSION
      # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  | 10      ...
      # |                       |     |     |                       |
      # |         size          |magic|attrs|        checksum       | payload ...
      payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
      attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
      payload = data[bytes_processed + 10, payload_size]

      case attributes & COMPRESSION_CODEC_MASK
      when NO_COMPRESSION # a single uncompressed message
        messages << Kafka::Message.new(payload, magic, checksum)
      when GZIP_COMPRESSION # a gzip-compressed message set -- parse recursively
        uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
        message_set = parse_from(uncompressed)
        raise 'malformed compressed message' if message_set.size != uncompressed.size
        messages.concat(message_set.messages)
      when SNAPPY_COMPRESSION # a snappy-compresses message set -- parse recursively
        ensure_snappy! do
          uncompressed = Snappy::Reader.new(StringIO.new(payload)).read
          message_set = parse_from(uncompressed)
          raise 'malformed compressed message' if message_set.size != uncompressed.size
          messages.concat(message_set.messages)
        end
      else
        # https://cwiki.apache.org/confluence/display/KAFKA/Compression
        raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
      end

    else
      raise "Unsupported Kafka message version: magic number #{magic}"
    end

    bytes_processed += message_size + 4 # 4 = sizeof(message_size)
  end

  MessageSet.new(bytes_processed, messages)
end

Instance Method Details

#calculate_checksumObject



54
55
56
# File 'lib/kafka/message.rb', line 54

def calculate_checksum
  Zlib.crc32(self.payload)
end

#encode(compression = NO_COMPRESSION) ⇒ Object



121
122
123
124
125
126
127
128
129
# File 'lib/kafka/message.rb', line 121

def encode(compression = NO_COMPRESSION)
  @compression = compression

  self.payload = asciify_payload
  self.payload = compress_payload if compression?

  data = magic_and_compression + [calculate_checksum].pack("N") + payload
  [data.length].pack("N") + data
end

#ensure_snappy!(&block) ⇒ Object



145
146
147
# File 'lib/kafka/message.rb', line 145

def ensure_snappy! &block
  self.class.ensure_snappy! &block
end

#valid?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/kafka/message.rb', line 58

def valid?
  self.checksum == calculate_checksum
end