Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

API Specification

Message => Crc MagicByte Attributes Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Constant Summary collapse

MAGIC_BYTE =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.



23
24
25
26
27
28
29
30
31
# File 'lib/kafka/protocol/message.rb', line 23

def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
  @key = key
  @value = value
  @codec_id = codec_id
  @offset = offset
  @create_time = create_time

  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



21
22
23
# File 'lib/kafka/protocol/message.rb', line 21

def bytesize
  @bytesize
end

#codec_idObject (readonly)

Returns the value of attribute codec_id.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def codec_id
  @codec_id
end

#create_timeObject (readonly)

Returns the value of attribute create_time.



21
22
23
# File 'lib/kafka/protocol/message.rb', line 21

def create_time
  @create_time
end

#keyObject (readonly)

Returns the value of attribute key.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def offset
  @offset
end

#valueObject (readonly)

Returns the value of attribute value.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/kafka/protocol/message.rb', line 62

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  crc = message_decoder.int32
  magic_byte = message_decoder.int8

  unless magic_byte == MAGIC_BYTE
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  attributes = message_decoder.int8
  key = message_decoder.bytes
  value = message_decoder.bytes

  # The codec id is encoded in the three least significant bits of the
  # attributes.
  codec_id = attributes & 0b111

  new(key: key, value: value, codec_id: codec_id, offset: offset)
end

Instance Method Details

#==(other) ⇒ Object



40
41
42
43
44
45
# File 'lib/kafka/protocol/message.rb', line 40

def ==(other)
  @key == other.key &&
    @value == other.value &&
    @codec_id == other.codec_id &&
    @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/kafka/protocol/message.rb', line 47

def compressed?
  @codec_id != 0
end

#decompressKafka::Protocol::MessageSet



52
53
54
55
56
57
58
59
60
# File 'lib/kafka/protocol/message.rb', line 52

def decompress
  codec = Compression.find_codec_by_id(@codec_id)

  # For some weird reason we need to cut out the first 20 bytes.
  data = codec.decompress(value)
  message_set_decoder = Decoder.from_string(data)

  MessageSet.decode(message_set_decoder)
end

#encode(encoder) ⇒ Object



33
34
35
36
37
38
# File 'lib/kafka/protocol/message.rb', line 33

def encode(encoder)
  data = encode_with_crc

  encoder.write_int64(offset)
  encoder.write_bytes(data)
end