Class: Kafka::Protocol::Record

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/record.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false) ⇒ Record

Returns a new instance of Record.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/kafka/protocol/record.rb', line 7

def initialize(
  key: nil,
  value:,
  headers: {},
  attributes: 0,
  offset_delta: 0,
  offset: 0,
  timestamp_delta: 0,
  create_time: Time.now,
  is_control_record: false
)
  @key = key
  @value = value
  @headers = headers
  @attributes = attributes

  @offset_delta = offset_delta
  @offset = offset
  @timestamp_delta = timestamp_delta
  @create_time = create_time
  @is_control_record = is_control_record

  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#attributesObject (readonly)

Returns the value of attribute attributes.



4
5
6
# File 'lib/kafka/protocol/record.rb', line 4

def attributes
  @attributes
end

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



4
5
6
# File 'lib/kafka/protocol/record.rb', line 4

def bytesize
  @bytesize
end

#create_timeObject

Returns the value of attribute create_time.



5
6
7
# File 'lib/kafka/protocol/record.rb', line 5

def create_time
  @create_time
end

#headersObject (readonly)

Returns the value of attribute headers.



4
5
6
# File 'lib/kafka/protocol/record.rb', line 4

def headers
  @headers
end

#is_control_recordObject

Returns the value of attribute is_control_record.



5
6
7
# File 'lib/kafka/protocol/record.rb', line 5

def is_control_record
  @is_control_record
end

#keyObject (readonly)

Returns the value of attribute key.



4
5
6
# File 'lib/kafka/protocol/record.rb', line 4

def key
  @key
end

#offsetObject

Returns the value of attribute offset.



5
6
7
# File 'lib/kafka/protocol/record.rb', line 5

def offset
  @offset
end

#offset_deltaObject

Returns the value of attribute offset_delta.



5
6
7
# File 'lib/kafka/protocol/record.rb', line 5

def offset_delta
  @offset_delta
end

#timestamp_deltaObject

Returns the value of attribute timestamp_delta.



5
6
7
# File 'lib/kafka/protocol/record.rb', line 5

def timestamp_delta
  @timestamp_delta
end

#valueObject (readonly)

Returns the value of attribute value.



4
5
6
# File 'lib/kafka/protocol/record.rb', line 4

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/protocol/record.rb', line 59

def self.decode(decoder)
  record_decoder = Decoder.from_string(decoder.varint_bytes)

  attributes = record_decoder.int8
  timestamp_delta = record_decoder.varint
  offset_delta = record_decoder.varint

  key = record_decoder.varint_string
  value = record_decoder.varint_bytes

  headers = {}
  record_decoder.varint_array do
    header_key = record_decoder.varint_string
    header_value = record_decoder.varint_bytes

    headers[header_key] = header_value
  end

  new(
    key: key,
    value: value,
    headers: headers,
    attributes: attributes,
    offset_delta: offset_delta,
    timestamp_delta: timestamp_delta
  )
end

Instance Method Details

#==(other) ⇒ Object



52
53
54
55
56
57
# File 'lib/kafka/protocol/record.rb', line 52

def ==(other)
  offset_delta == other.offset_delta &&
    timestamp_delta == other.timestamp_delta &&
    offset == other.offset &&
    is_control_record == other.is_control_record
end

#encode(encoder) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka/protocol/record.rb', line 32

def encode(encoder)
  record_buffer = StringIO.new

  record_encoder = Encoder.new(record_buffer)

  record_encoder.write_int8(@attributes)
  record_encoder.write_varint(@timestamp_delta)
  record_encoder.write_varint(@offset_delta)

  record_encoder.write_varint_string(@key)
  record_encoder.write_varint_bytes(@value)

  record_encoder.write_varint_array(@headers.to_a) do |header_key, header_value|
    record_encoder.write_varint_string(header_key.to_s)
    record_encoder.write_varint_bytes(header_value.to_s)
  end

  encoder.write_varint_bytes(record_buffer.string)
end