Class: Kafka::Protocol::Message
- Inherits:
- 
      Object
      
        - Object
- Kafka::Protocol::Message
 
- Defined in:
- lib/kafka/protocol/message.rb
Overview
## API Specification
Message => Crc MagicByte Attributes Timestamp Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64, in ms
    Key => bytes
    Value => bytes
Constant Summary collapse
- MAGIC_BYTE =
- 1
Instance Attribute Summary collapse
- 
  
    
      #bytesize  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute bytesize. 
- 
  
    
      #codec_id  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute codec_id. 
- 
  
    
      #create_time  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute create_time. 
- 
  
    
      #key  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute key. 
- 
  
    
      #offset  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute offset. 
- 
  
    
      #value  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute value. 
Class Method Summary collapse
Instance Method Summary collapse
- #==(other) ⇒ Object
- #compressed? ⇒ Boolean
- #decompress ⇒ Array<Kafka::Protocol::Message>
- #encode(encoder) ⇒ Object
- #headers ⇒ Object
- 
  
    
      #initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1))  ⇒ Message 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Message. 
- 
  
    
      #is_control_record  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Ensure the backward compatibility of Message format from Kafka 0.11.x. 
Constructor Details
#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message
Returns a new instance of Message.
| 26 27 28 29 30 31 32 33 34 | # File 'lib/kafka/protocol/message.rb', line 26 def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1) @key = key @value = value @codec_id = codec_id @offset = offset @create_time = create_time @bytesize = @key.to_s.bytesize + @value.to_s.bytesize end | 
Instance Attribute Details
#bytesize ⇒ Object (readonly)
Returns the value of attribute bytesize.
| 24 25 26 | # File 'lib/kafka/protocol/message.rb', line 24 def bytesize @bytesize end | 
#codec_id ⇒ Object (readonly)
Returns the value of attribute codec_id.
| 22 23 24 | # File 'lib/kafka/protocol/message.rb', line 22 def codec_id @codec_id end | 
#create_time ⇒ Object (readonly)
Returns the value of attribute create_time.
| 24 25 26 | # File 'lib/kafka/protocol/message.rb', line 24 def create_time @create_time end | 
#key ⇒ Object (readonly)
Returns the value of attribute key.
| 22 23 24 | # File 'lib/kafka/protocol/message.rb', line 22 def key @key end | 
#offset ⇒ Object (readonly)
Returns the value of attribute offset.
| 22 23 24 | # File 'lib/kafka/protocol/message.rb', line 22 def offset @offset end | 
#value ⇒ Object (readonly)
Returns the value of attribute value.
| 22 23 24 | # File 'lib/kafka/protocol/message.rb', line 22 def value @value end | 
Class Method Details
.decode(decoder) ⇒ Object
| 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | # File 'lib/kafka/protocol/message.rb', line 66 def self.decode(decoder) offset = decoder.int64 = Decoder.from_string(decoder.bytes) _crc = .int32 magic_byte = .int8 attributes = .int8 # The magic byte indicates the message format version. There are situations # where an old message format can be returned from a newer version of Kafka, # because old messages are not necessarily rewritten on upgrades. case magic_byte when 0 # No timestamp in the pre-0.10 message format. = nil when 1 = .int64 # If the timestamp is set to zero, it's because the message has been upgraded # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't # have a timestamp attribute, so we'll just set the timestamp to nil. = nil if .zero? else raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end key = .bytes value = .bytes # The codec id is encoded in the three least significant bits of the # attributes. codec_id = attributes & 0b111 # The timestamp will be nil if the message was written in the Kafka 0.9 log format. create_time = && Time.at( / 1000.0) new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time) end | 
Instance Method Details
#==(other) ⇒ Object
| 43 44 45 46 47 48 | # File 'lib/kafka/protocol/message.rb', line 43 def ==(other) @key == other.key && @value == other.value && @codec_id == other.codec_id && @offset == other.offset end | 
#compressed? ⇒ Boolean
| 50 51 52 | # File 'lib/kafka/protocol/message.rb', line 50 def compressed? @codec_id != 0 end | 
#decompress ⇒ Array<Kafka::Protocol::Message>
| 55 56 57 58 59 60 61 62 63 64 | # File 'lib/kafka/protocol/message.rb', line 55 def decompress codec = Compression.find_codec_by_id(@codec_id) # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) = Decoder.from_string(data) = MessageSet.decode() correct_offsets(.) end | 
#encode(encoder) ⇒ Object
| 36 37 38 39 40 41 | # File 'lib/kafka/protocol/message.rb', line 36 def encode(encoder) data = encode_with_crc encoder.write_int64(offset) encoder.write_bytes(data) end | 
#headers ⇒ Object
| 110 111 112 | # File 'lib/kafka/protocol/message.rb', line 110 def headers {} end | 
#is_control_record ⇒ Object
Ensure the backward compatibility of Message format from Kafka 0.11.x
| 106 107 108 | # File 'lib/kafka/protocol/message.rb', line 106 def is_control_record false end |