Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

API Specification

Message => Crc MagicByte Attributes Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Constant Summary collapse

MAGIC_BYTE =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, attributes: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.



21
22
23
24
25
26
# File 'lib/kafka/protocol/message.rb', line 21

def initialize(value:, key: nil, attributes: 0, offset: -1)
  @key = key
  @value = value
  @attributes = attributes
  @offset = offset
end

Instance Attribute Details

#attributesObject (readonly)

Returns the value of attribute attributes.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def attributes
  @attributes
end

#keyObject (readonly)

Returns the value of attribute key.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def offset
  @offset
end

#valueObject (readonly)

Returns the value of attribute value.



19
20
21
# File 'lib/kafka/protocol/message.rb', line 19

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kafka/protocol/message.rb', line 57

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  crc = message_decoder.int32
  magic_byte = message_decoder.int8

  unless magic_byte == MAGIC_BYTE
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  attributes = message_decoder.int8
  key = message_decoder.bytes
  value = message_decoder.bytes

  new(key: key, value: value, attributes: attributes, offset: offset)
end

Instance Method Details

#==(other) ⇒ Object



35
36
37
38
39
40
# File 'lib/kafka/protocol/message.rb', line 35

def ==(other)
  @key == other.key &&
    @value == other.value &&
    @attributes == other.attributes &&
    @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/kafka/protocol/message.rb', line 42

def compressed?
  @attributes != 0
end

#decompressKafka::Protocol::MessageSet



47
48
49
50
51
52
53
54
55
# File 'lib/kafka/protocol/message.rb', line 47

def decompress
  codec = Compression.find_codec_by_id(@attributes)

  # For some weird reason we need to cut out the first 20 bytes.
  data = codec.decompress(value)
  message_set_decoder = Decoder.from_string(data)

  MessageSet.decode(message_set_decoder)
end

#encode(encoder) ⇒ Object



28
29
30
31
32
33
# File 'lib/kafka/protocol/message.rb', line 28

def encode(encoder)
  data = encode_with_crc

  encoder.write_int64(offset)
  encoder.write_bytes(data)
end