Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

API Specification

Message => Crc MagicByte Attributes Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Constant Summary collapse

MAGIC_BYTE =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, attributes: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.



23
24
25
26
27
28
29
30
# File 'lib/kafka/protocol/message.rb', line 23

def initialize(value:, key: nil, attributes: 0, offset: -1)
  @key = key
  @value = value
  @attributes = attributes
  @offset = offset

  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#attributesObject (readonly)

Returns the value of attribute attributes.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def attributes
  @attributes
end

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



21
22
23
# File 'lib/kafka/protocol/message.rb', line 21

def bytesize
  @bytesize
end

#keyObject (readonly)

Returns the value of attribute key.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def offset
  @offset
end

#valueObject (readonly)

Returns the value of attribute value.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/kafka/protocol/message.rb', line 61

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  crc = message_decoder.int32
  magic_byte = message_decoder.int8

  unless magic_byte == MAGIC_BYTE
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  attributes = message_decoder.int8
  key = message_decoder.bytes
  value = message_decoder.bytes

  new(key: key, value: value, attributes: attributes, offset: offset)
end

Instance Method Details

#==(other) ⇒ Object



39
40
41
42
43
44
# File 'lib/kafka/protocol/message.rb', line 39

def ==(other)
  @key == other.key &&
    @value == other.value &&
    @attributes == other.attributes &&
    @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/kafka/protocol/message.rb', line 46

def compressed?
  @attributes != 0
end

#decompressKafka::Protocol::MessageSet



51
52
53
54
55
56
57
58
59
# File 'lib/kafka/protocol/message.rb', line 51

def decompress
  codec = Compression.find_codec_by_id(@attributes)

  # For some weird reason we need to cut out the first 20 bytes.
  data = codec.decompress(value)
  message_set_decoder = Decoder.from_string(data)

  MessageSet.decode(message_set_decoder)
end

#encode(encoder) ⇒ Object



32
33
34
35
36
37
# File 'lib/kafka/protocol/message.rb', line 32

def encode(encoder)
  data = encode_with_crc

  encoder.write_int64(offset)
  encoder.write_bytes(data)
end