Class: Kafka::Protocol::Message
- Inherits:
-
Object
- Object
- Kafka::Protocol::Message
- Defined in:
- lib/kafka/protocol/message.rb
Overview
API Specification
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Constant Summary collapse
- MAGIC_BYTE =
0
Instance Attribute Summary collapse
-
#attributes ⇒ Object
readonly
Returns the value of attribute attributes.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#offset ⇒ Object
readonly
Returns the value of attribute offset.
-
#value ⇒ Object
readonly
Returns the value of attribute value.
Class Method Summary collapse
Instance Method Summary collapse
- #==(other) ⇒ Object
- #compressed? ⇒ Boolean
- #decompress ⇒ Kafka::Protocol::MessageSet
- #encode(encoder) ⇒ Object
-
#initialize(value:, key: nil, attributes: 0, offset: -1)) ⇒ Message
constructor
A new instance of Message.
Constructor Details
#initialize(value:, key: nil, attributes: 0, offset: -1)) ⇒ Message
Returns a new instance of Message.
21 22 23 24 25 26 |
# File 'lib/kafka/protocol/message.rb', line 21 def initialize(value:, key: nil, attributes: 0, offset: -1) @key = key @value = value @attributes = attributes @offset = offset end |
Instance Attribute Details
#attributes ⇒ Object (readonly)
Returns the value of attribute attributes.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def attributes @attributes end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def key @key end |
#offset ⇒ Object (readonly)
Returns the value of attribute offset.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def offset @offset end |
#value ⇒ Object (readonly)
Returns the value of attribute value.
19 20 21 |
# File 'lib/kafka/protocol/message.rb', line 19 def value @value end |
Class Method Details
.decode(decoder) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/kafka/protocol/message.rb', line 57 def self.decode(decoder) offset = decoder.int64 = Decoder.from_string(decoder.bytes) crc = .int32 magic_byte = .int8 unless magic_byte == MAGIC_BYTE raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end attributes = .int8 key = .bytes value = .bytes new(key: key, value: value, attributes: attributes, offset: offset) end |
Instance Method Details
#==(other) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/kafka/protocol/message.rb', line 35 def ==(other) @key == other.key && @value == other.value && @attributes == other.attributes && @offset == other.offset end |
#compressed? ⇒ Boolean
42 43 44 |
# File 'lib/kafka/protocol/message.rb', line 42 def compressed? @attributes != 0 end |
#decompress ⇒ Kafka::Protocol::MessageSet
47 48 49 50 51 52 53 54 55 |
# File 'lib/kafka/protocol/message.rb', line 47 def decompress codec = Compression.find_codec_by_id(@attributes) # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) = Decoder.from_string(data) MessageSet.decode() end |
#encode(encoder) ⇒ Object
28 29 30 31 32 33 |
# File 'lib/kafka/protocol/message.rb', line 28 def encode(encoder) data = encode_with_crc encoder.write_int64(offset) encoder.write_bytes(data) end |