Class: Kafka::Protocol::RecordBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/record_batch.rb

Constant Summary collapse

MAGIC_BYTE =
2
RECORD_BATCH_OVERHEAD =

The size of metadata before the real record data

49
CODEC_ID_MASK =

Masks to extract information from attributes

0b00000111
IN_TRANSACTION_MASK =
0b00010000
IS_CONTROL_BATCH_MASK =
0b00100000
TIMESTAMP_TYPE_MASK =
0b001000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1,, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now) ⇒ RecordBatch

Returns a new instance of RecordBatch.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kafka/protocol/record_batch.rb', line 21

def initialize(
    records: [],
    first_offset: 0,
    first_timestamp: Time.now,
    partition_leader_epoch: 0,
    codec_id: 0,
    in_transaction: false,
    is_control_batch: false,
    last_offset_delta: 0,
    producer_id: -1,
    producer_epoch: 0,
    first_sequence: 0,
    max_timestamp: Time.now
)
  @records = Array(records)
  @first_offset = first_offset
  @first_timestamp = first_timestamp
  @codec_id = codec_id

  # Records verification
  @last_offset_delta = last_offset_delta
  @max_timestamp = max_timestamp

  # Transaction information
  @producer_id = producer_id
  @producer_epoch = producer_epoch

  @first_sequence = first_sequence
  @partition_leader_epoch = partition_leader_epoch
  @in_transaction = in_transaction
  @is_control_batch = is_control_batch

  mark_control_record
end

Instance Attribute Details

#codec_idObject

Returns the value of attribute codec_id.



19
20
21
# File 'lib/kafka/protocol/record_batch.rb', line 19

def codec_id
  @codec_id
end

#first_offsetObject (readonly)

Returns the value of attribute first_offset.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_offset
  @first_offset
end

#first_sequenceObject (readonly)

Returns the value of attribute first_sequence.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_sequence
  @first_sequence
end

#first_timestampObject (readonly)

Returns the value of attribute first_timestamp.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_timestamp
  @first_timestamp
end

#in_transactionObject (readonly)

Returns the value of attribute in_transaction.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def in_transaction
  @in_transaction
end

#is_control_batchObject (readonly)

Returns the value of attribute is_control_batch.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def is_control_batch
  @is_control_batch
end

#last_offset_deltaObject (readonly)

Returns the value of attribute last_offset_delta.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def last_offset_delta
  @last_offset_delta
end

#max_timestampObject (readonly)

Returns the value of attribute max_timestamp.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def max_timestamp
  @max_timestamp
end

#partition_leader_epochObject (readonly)

Returns the value of attribute partition_leader_epoch.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def partition_leader_epoch
  @partition_leader_epoch
end

#producer_epochObject (readonly)

Returns the value of attribute producer_epoch.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def producer_epoch
  @producer_epoch
end

#producer_idObject (readonly)

Returns the value of attribute producer_id.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def producer_id
  @producer_id
end

#recordsObject (readonly)

Returns the value of attribute records.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def records
  @records
end

Class Method Details

.decode(decoder) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/kafka/protocol/record_batch.rb', line 152

def self.decode(decoder)
  first_offset = decoder.int64

  record_batch_raw = decoder.bytes
  record_batch_decoder = Decoder.from_string(record_batch_raw)

  partition_leader_epoch = record_batch_decoder.int32
  # Currently, the magic byte is used to distingush legacy MessageSet and
  # RecordBatch. Therefore, we don't care about magic byte here yet.
  _magic_byte = record_batch_decoder.int8
  _crc = record_batch_decoder.int32

  attributes = record_batch_decoder.int16
  codec_id = attributes & CODEC_ID_MASK
  in_transaction = (attributes & IN_TRANSACTION_MASK) > 0
  is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0
  log_append_time = (attributes & TIMESTAMP_TYPE_MASK) != 0

  last_offset_delta = record_batch_decoder.int32
  first_timestamp = Time.at(record_batch_decoder.int64 / BigDecimal(1000))
  max_timestamp = Time.at(record_batch_decoder.int64 / BigDecimal(1000))

  producer_id = record_batch_decoder.int64
  producer_epoch = record_batch_decoder.int16
  first_sequence = record_batch_decoder.int32

  records_array_length = record_batch_decoder.int32
  records_array_raw = record_batch_decoder.read(
    record_batch_raw.size - RECORD_BATCH_OVERHEAD
  )
  if codec_id != 0
    codec = Compression.find_codec_by_id(codec_id)
    records_array_raw = codec.decompress(records_array_raw)
  end

  records_array_decoder = Decoder.from_string(records_array_raw)
  records_array = []
  until records_array_decoder.eof?
    record = Record.decode(records_array_decoder)
    record.offset = first_offset + record.offset_delta
    record.create_time = log_append_time && max_timestamp ? max_timestamp : first_timestamp + record.timestamp_delta / BigDecimal(1000)
    records_array << record
  end

  raise InsufficientDataMessage if records_array.length != records_array_length

  new(
    records: records_array,
    first_offset: first_offset,
    first_timestamp: first_timestamp,
    partition_leader_epoch: partition_leader_epoch,
    in_transaction: in_transaction,
    is_control_batch: is_control_batch,
    last_offset_delta: last_offset_delta,
    producer_id: producer_id,
    producer_epoch: producer_epoch,
    first_sequence: first_sequence,
    max_timestamp: max_timestamp
  )
rescue EOFError
  raise InsufficientDataMessage, 'Partial trailing record detected!'
end

Instance Method Details

#==(other) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/kafka/protocol/record_batch.rb', line 140

def ==(other)
  records == other.records &&
    first_offset == other.first_offset &&
    partition_leader_epoch == other.partition_leader_epoch &&
    in_transaction == other.in_transaction &&
    is_control_batch == other.is_control_batch &&
    last_offset_delta == other.last_offset_delta &&
    producer_id == other.producer_id &&
    producer_epoch == other.producer_epoch &&
    first_sequence == other.first_sequence
end

#attributesObject



64
65
66
67
68
# File 'lib/kafka/protocol/record_batch.rb', line 64

def attributes
  0x0000 | @codec_id |
    (@in_transaction ? IN_TRANSACTION_MASK : 0x0) |
    (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0)
end

#compressed?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/kafka/protocol/record_batch.rb', line 122

def compressed?
  @codec_id != 0
end

#encode(encoder) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kafka/protocol/record_batch.rb', line 70

def encode(encoder)
  encoder.write_int64(@first_offset)

  record_batch_buffer = StringIO.new
  record_batch_encoder = Encoder.new(record_batch_buffer)

  record_batch_encoder.write_int32(@partition_leader_epoch)
  record_batch_encoder.write_int8(MAGIC_BYTE)

  body = encode_record_batch_body
  crc = ::Digest::CRC32c.checksum(body)

  record_batch_encoder.write_int32(crc)
  record_batch_encoder.write(body)

  encoder.write_bytes(record_batch_buffer.string)
end

#encode_record_arrayObject



113
114
115
116
117
118
119
120
# File 'lib/kafka/protocol/record_batch.rb', line 113

def encode_record_array
  buffer = StringIO.new
  encoder = Encoder.new(buffer)
  @records.each do |record|
    record.encode(encoder)
  end
  buffer.string
end

#encode_record_batch_bodyObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kafka/protocol/record_batch.rb', line 88

def encode_record_batch_body
  buffer = StringIO.new
  encoder = Encoder.new(buffer)

  encoder.write_int16(attributes)
  encoder.write_int32(@last_offset_delta)
  encoder.write_int64((@first_timestamp.to_f * 1000).to_i)
  encoder.write_int64((@max_timestamp.to_f * 1000).to_i)

  encoder.write_int64(@producer_id)
  encoder.write_int16(@producer_epoch)
  encoder.write_int32(@first_sequence)

  encoder.write_int32(@records.length)

  records_array = encode_record_array
  if compressed?
    codec = Compression.find_codec_by_id(@codec_id)
    records_array = codec.compress(records_array)
  end
  encoder.write(records_array)

  buffer.string
end

#fulfill_relative_dataObject



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/kafka/protocol/record_batch.rb', line 126

def fulfill_relative_data
  first_record = records.min_by { |record| record.create_time }
  @first_timestamp = first_record.nil? ? Time.now : first_record.create_time

  last_record = records.max_by { |record| record.create_time }
  @max_timestamp = last_record.nil? ? Time.now : last_record.create_time

  records.each_with_index do |record, index|
    record.offset_delta = index
    record.timestamp_delta = ((record.create_time - first_timestamp) * 1000).to_i
  end
  @last_offset_delta = records.length - 1
end

#last_offsetObject



60
61
62
# File 'lib/kafka/protocol/record_batch.rb', line 60

def last_offset
  @first_offset + @last_offset_delta
end

#mark_control_recordObject



215
216
217
218
219
220
# File 'lib/kafka/protocol/record_batch.rb', line 215

def mark_control_record
  if is_control_batch
    record = @records.first
    record.is_control_record = true unless record.nil?
  end
end

#sizeObject



56
57
58
# File 'lib/kafka/protocol/record_batch.rb', line 56

def size
  @records.size
end