Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

API Specification

Message => Crc MagicByte Attributes Timestamp Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64, in ms
    Key => bytes
    Value => bytes

Constant Summary collapse

MAGIC_BYTE =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.



24
25
26
27
28
29
30
31
32
# File 'lib/kafka/protocol/message.rb', line 24

def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
  @key = key
  @value = value
  @codec_id = codec_id
  @offset = offset
  @create_time = create_time

  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def bytesize
  @bytesize
end

#codec_idObject (readonly)

Returns the value of attribute codec_id.



20
21
22
# File 'lib/kafka/protocol/message.rb', line 20

def codec_id
  @codec_id
end

#create_timeObject (readonly)

Returns the value of attribute create_time.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def create_time
  @create_time
end

#keyObject (readonly)

Returns the value of attribute key.



20
21
22
# File 'lib/kafka/protocol/message.rb', line 20

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



20
21
22
# File 'lib/kafka/protocol/message.rb', line 20

def offset
  @offset
end

#valueObject (readonly)

Returns the value of attribute value.



20
21
22
# File 'lib/kafka/protocol/message.rb', line 20

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/kafka/protocol/message.rb', line 75

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  crc = message_decoder.int32
  magic_byte = message_decoder.int8
  attributes = message_decoder.int8

  # The magic byte indicates the message format version. There are situations
  # where an old message format can be returned from a newer version of Kafka,
  # because old messages are not necessarily rewritten on upgrades.
  case magic_byte
  when 0
    # No timestamp in the pre-0.10 message format.
    timestamp = nil
  when 1
    timestamp = message_decoder.int64

    # If the timestamp is set to zero, it's because the message has been upgraded
    # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't
    # have a timestamp attribute, so we'll just set the timestamp to nil.
    timestamp = nil if timestamp.zero?
  else
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  key = message_decoder.bytes
  value = message_decoder.bytes

  # The codec id is encoded in the three least significant bits of the
  # attributes.
  codec_id = attributes & 0b111

  # The timestamp will be nil if the message was written in the Kafka 0.9 log format.
  create_time = timestamp && Time.at(timestamp / 1000.0)

  new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end

Instance Method Details

#==(other) ⇒ Object



41
42
43
44
45
46
# File 'lib/kafka/protocol/message.rb', line 41

def ==(other)
  @key == other.key &&
    @value == other.value &&
    @codec_id == other.codec_id &&
    @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/kafka/protocol/message.rb', line 48

def compressed?
  @codec_id != 0
end

#decompressKafka::Protocol::MessageSet



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kafka/protocol/message.rb', line 53

def decompress
  codec = Compression.find_codec_by_id(@codec_id)

  # For some weird reason we need to cut out the first 20 bytes.
  data = codec.decompress(value)
  message_set_decoder = Decoder.from_string(data)
  message_set = MessageSet.decode(message_set_decoder)

  # The contained messages need to have their offset corrected.
  messages = message_set.messages.each_with_index.map do |message, i|
    Message.new(
      offset: offset + i,
      value: message.value,
      key: message.key,
      create_time: message.create_time,
      codec_id: message.codec_id
    )
  end

  MessageSet.new(messages: messages)
end

#encode(encoder) ⇒ Object



34
35
36
37
38
39
# File 'lib/kafka/protocol/message.rb', line 34

def encode(encoder)
  data = encode_with_crc

  encoder.write_int64(offset)
  encoder.write_bytes(data)
end