Class: Kafka::Protocol::RecordBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/record_batch.rb

Constant Summary collapse

MAGIC_BYTE =
2
RECORD_BATCH_OVERHEAD =

The size of metadata before the real record data

49
CODEC_ID_MASK =

Masks to extract information from attributes

0b00000111
IN_TRANSACTION_MASK =
0b00010000
IS_CONTROL_BATCH_MASK =
0b00100000
TIMESTAMP_TYPE_MASK =
0b001000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1,, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now) ⇒ RecordBatch

Returns a new instance of RecordBatch.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/kafka/protocol/record_batch.rb', line 20

def initialize(
    records: [],
    first_offset: 0,
    first_timestamp: Time.now,
    partition_leader_epoch: 0,
    codec_id: 0,
    in_transaction: false,
    is_control_batch: false,
    last_offset_delta: 0,
    producer_id: -1,
    producer_epoch: 0,
    first_sequence: 0,
    max_timestamp: Time.now
)
  @records = Array(records)
  @first_offset = first_offset
  @first_timestamp = first_timestamp
  @codec_id = codec_id

  # Records verification
  @last_offset_delta = last_offset_delta
  @max_timestamp = max_timestamp

  # Transaction information
  @producer_id = producer_id
  @producer_epoch = producer_epoch

  @first_sequence = first_sequence
  @partition_leader_epoch = partition_leader_epoch
  @in_transaction = in_transaction
  @is_control_batch = is_control_batch

  mark_control_record
end

Instance Attribute Details

#codec_idObject

Returns the value of attribute codec_id.



18
19
20
# File 'lib/kafka/protocol/record_batch.rb', line 18

def codec_id
  @codec_id
end

#first_offsetObject (readonly)

Returns the value of attribute first_offset.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def first_offset
  @first_offset
end

#first_sequenceObject (readonly)

Returns the value of attribute first_sequence.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def first_sequence
  @first_sequence
end

#first_timestampObject (readonly)

Returns the value of attribute first_timestamp.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def first_timestamp
  @first_timestamp
end

#in_transactionObject (readonly)

Returns the value of attribute in_transaction.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def in_transaction
  @in_transaction
end

#is_control_batchObject (readonly)

Returns the value of attribute is_control_batch.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def is_control_batch
  @is_control_batch
end

#last_offset_deltaObject (readonly)

Returns the value of attribute last_offset_delta.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def last_offset_delta
  @last_offset_delta
end

#max_timestampObject (readonly)

Returns the value of attribute max_timestamp.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def max_timestamp
  @max_timestamp
end

#partition_leader_epochObject (readonly)

Returns the value of attribute partition_leader_epoch.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def partition_leader_epoch
  @partition_leader_epoch
end

#producer_epochObject (readonly)

Returns the value of attribute producer_epoch.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def producer_epoch
  @producer_epoch
end

#producer_idObject (readonly)

Returns the value of attribute producer_id.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def producer_id
  @producer_id
end

#recordsObject (readonly)

Returns the value of attribute records.



16
17
18
# File 'lib/kafka/protocol/record_batch.rb', line 16

def records
  @records
end

Class Method Details

.decode(decoder) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/kafka/protocol/record_batch.rb', line 151

def self.decode(decoder)
  first_offset = decoder.int64

  record_batch_raw = decoder.bytes
  record_batch_decoder = Decoder.from_string(record_batch_raw)

  partition_leader_epoch = record_batch_decoder.int32
  # Currently, the magic byte is used to distingush legacy MessageSet and
  # RecordBatch. Therefore, we don't care about magic byte here yet.
  _magic_byte = record_batch_decoder.int8
  _crc = record_batch_decoder.int32

  attributes = record_batch_decoder.int16
  codec_id = attributes & CODEC_ID_MASK
  in_transaction = (attributes & IN_TRANSACTION_MASK) > 0
  is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0
  log_append_time = (attributes & TIMESTAMP_TYPE_MASK) != 0

  last_offset_delta = record_batch_decoder.int32
  first_timestamp = Time.at(record_batch_decoder.int64 / 1000)
  max_timestamp = Time.at(record_batch_decoder.int64 / 1000)

  producer_id = record_batch_decoder.int64
  producer_epoch = record_batch_decoder.int16
  first_sequence = record_batch_decoder.int32

  records_array_length = record_batch_decoder.int32
  records_array_raw = record_batch_decoder.read(
    record_batch_raw.size - RECORD_BATCH_OVERHEAD
  )
  if codec_id != 0
    codec = Compression.find_codec_by_id(codec_id)
    records_array_raw = codec.decompress(records_array_raw)
  end

  records_array_decoder = Decoder.from_string(records_array_raw)
  records_array = []
  until records_array_decoder.eof?
    record = Record.decode(records_array_decoder)
    record.offset = first_offset + record.offset_delta
    record.create_time = log_append_time && max_timestamp ? max_timestamp : first_timestamp + record.timestamp_delta
    records_array << record
  end

  raise InsufficientDataMessage if records_array.length != records_array_length

  new(
    records: records_array,
    first_offset: first_offset,
    first_timestamp: first_timestamp,
    partition_leader_epoch: partition_leader_epoch,
    in_transaction: in_transaction,
    is_control_batch: is_control_batch,
    last_offset_delta: last_offset_delta,
    producer_id: producer_id,
    producer_epoch: producer_epoch,
    first_sequence: first_sequence,
    max_timestamp: max_timestamp
  )
rescue EOFError
  raise InsufficientDataMessage, 'Partial trailing record detected!'
end

Instance Method Details

#==(other) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/kafka/protocol/record_batch.rb', line 139

def ==(other)
  records == other.records &&
    first_offset == other.first_offset &&
    partition_leader_epoch == other.partition_leader_epoch &&
    in_transaction == other.in_transaction &&
    is_control_batch == other.is_control_batch &&
    last_offset_delta == other.last_offset_delta &&
    producer_id == other.producer_id &&
    producer_epoch == other.producer_epoch &&
    first_sequence == other.first_sequence
end

#attributesObject



63
64
65
66
67
# File 'lib/kafka/protocol/record_batch.rb', line 63

def attributes
  0x0000 | @codec_id |
    (@in_transaction ? IN_TRANSACTION_MASK : 0x0) |
    (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0)
end

#compressed?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/kafka/protocol/record_batch.rb', line 121

def compressed?
  @codec_id != 0
end

#encode(encoder) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/protocol/record_batch.rb', line 69

def encode(encoder)
  encoder.write_int64(@first_offset)

  record_batch_buffer = StringIO.new
  record_batch_encoder = Encoder.new(record_batch_buffer)

  record_batch_encoder.write_int32(@partition_leader_epoch)
  record_batch_encoder.write_int8(MAGIC_BYTE)

  body = encode_record_batch_body
  crc = Digest::CRC32c.checksum(body)

  record_batch_encoder.write_int32(crc)
  record_batch_encoder.write(body)

  encoder.write_bytes(record_batch_buffer.string)
end

#encode_record_arrayObject



112
113
114
115
116
117
118
119
# File 'lib/kafka/protocol/record_batch.rb', line 112

def encode_record_array
  buffer = StringIO.new
  encoder = Encoder.new(buffer)
  @records.each do |record|
    record.encode(encoder)
  end
  buffer.string
end

#encode_record_batch_bodyObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/kafka/protocol/record_batch.rb', line 87

def encode_record_batch_body
  buffer = StringIO.new
  encoder = Encoder.new(buffer)

  encoder.write_int16(attributes)
  encoder.write_int32(@last_offset_delta)
  encoder.write_int64((@first_timestamp.to_f * 1000).to_i)
  encoder.write_int64((@max_timestamp.to_f * 1000).to_i)

  encoder.write_int64(@producer_id)
  encoder.write_int16(@producer_epoch)
  encoder.write_int32(@first_sequence)

  encoder.write_int32(@records.length)

  records_array = encode_record_array
  if compressed?
    codec = Compression.find_codec_by_id(@codec_id)
    records_array = codec.compress(records_array)
  end
  encoder.write(records_array)

  buffer.string
end

#fulfill_relative_dataObject



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/protocol/record_batch.rb', line 125

def fulfill_relative_data
  first_record = records.min_by { |record| record.create_time }
  @first_timestamp = first_record.nil? ? Time.now : first_record.create_time

  last_record = records.max_by { |record| record.create_time }
  @max_timestamp = last_record.nil? ? Time.now : last_record.create_time

  records.each_with_index do |record, index|
    record.offset_delta = index
    record.timestamp_delta = (record.create_time - first_timestamp).to_i
  end
  @last_offset_delta = records.length - 1
end

#last_offsetObject



59
60
61
# File 'lib/kafka/protocol/record_batch.rb', line 59

def last_offset
  @first_offset + @last_offset_delta
end

#mark_control_recordObject



214
215
216
217
218
219
# File 'lib/kafka/protocol/record_batch.rb', line 214

def mark_control_record
  if in_transaction && is_control_batch
    record = @records.first
    record.is_control_record = true unless record.nil?
  end
end

#sizeObject



55
56
57
# File 'lib/kafka/protocol/record_batch.rb', line 55

def size
  @records.size
end