Class: Kafka::Protocol::Message
- Inherits:
-
Object
- Object
- Kafka::Protocol::Message
- Defined in:
- lib/kafka/protocol/message.rb
Overview
API Specification
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Constant Summary collapse
- MAGIC_BYTE =
0
Instance Attribute Summary collapse
-
#bytesize ⇒ Object
readonly
Returns the value of attribute bytesize.
-
#codec_id ⇒ Object
readonly
Returns the value of attribute codec_id.
-
#create_time ⇒ Object
readonly
Returns the value of attribute create_time.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#offset ⇒ Object
readonly
Returns the value of attribute offset.
-
#value ⇒ Object
readonly
Returns the value of attribute value.
Class Method Summary collapse
Instance Method Summary collapse
- #==(other) ⇒ Object
- #compressed? ⇒ Boolean
- #decompress ⇒ Kafka::Protocol::MessageSet
- #encode(encoder) ⇒ Object
-
#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message
constructor
A new instance of Message.
Constructor Details
#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message
Returns a new instance of Message.
23 24 25 26 27 28 29 30 31 |
# File 'lib/kafka/protocol/message.rb', line 23 def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1) @key = key @value = value @codec_id = codec_id @offset = offset @create_time = create_time @bytesize = @key.to_s.bytesize + @value.to_s.bytesize end |
Instance Attribute Details
#bytesize ⇒ Object (readonly)
Returns the value of attribute bytesize.
21 22 23 |
# File 'lib/kafka/protocol/message.rb', line 21 def bytesize @bytesize end |
#codec_id ⇒ Object (readonly)
Returns the value of attribute codec_id.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def codec_id @codec_id end |
#create_time ⇒ Object (readonly)
Returns the value of attribute create_time.
21 22 23 |
# File 'lib/kafka/protocol/message.rb', line 21 def create_time @create_time end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def key @key end |
#offset ⇒ Object (readonly)
Returns the value of attribute offset.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def offset @offset end |
#value ⇒ Object (readonly)
Returns the value of attribute value.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def value @value end |
Class Method Details
.decode(decoder) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/kafka/protocol/message.rb', line 62 def self.decode(decoder) offset = decoder.int64 = Decoder.from_string(decoder.bytes) crc = .int32 magic_byte = .int8 unless magic_byte == MAGIC_BYTE raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end attributes = .int8 key = .bytes value = .bytes # The codec id is encoded in the three least significant bits of the # attributes. codec_id = attributes & 0b111 new(key: key, value: value, codec_id: codec_id, offset: offset) end |
Instance Method Details
#==(other) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/kafka/protocol/message.rb', line 40 def ==(other) @key == other.key && @value == other.value && @codec_id == other.codec_id && @offset == other.offset end |
#compressed? ⇒ Boolean
47 48 49 |
# File 'lib/kafka/protocol/message.rb', line 47 def compressed? @codec_id != 0 end |
#decompress ⇒ Kafka::Protocol::MessageSet
52 53 54 55 56 57 58 59 60 |
# File 'lib/kafka/protocol/message.rb', line 52 def decompress codec = Compression.find_codec_by_id(@codec_id) # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) = Decoder.from_string(data) MessageSet.decode() end |
#encode(encoder) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/kafka/protocol/message.rb', line 33 def encode(encoder) data = encode_with_crc encoder.write_int64(offset) encoder.write_bytes(data) end |