Class: Kafka::Protocol::RecordBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/record_batch.rb

Constant Summary collapse

MAGIC_BYTE =
2
RECORD_BATCH_OVERHEAD =

The size of metadata before the real record data

49
CODEC_ID_MASK =

Masks to extract information from attributes

0b00000111
IN_TRANSACTION_MASK =
0b00010000
IS_CONTROL_BATCH_MASK =
0b00100000
TIMESTAMP_TYPE_MASK =
0b001000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1,, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now) ⇒ RecordBatch

Returns a new instance of RecordBatch


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kafka/protocol/record_batch.rb', line 21

def initialize(
    records: [],
    first_offset: 0,
    first_timestamp: Time.now,
    partition_leader_epoch: 0,
    codec_id: 0,
    in_transaction: false,
    is_control_batch: false,
    last_offset_delta: 0,
    producer_id: -1,
    producer_epoch: 0,
    first_sequence: 0,
    max_timestamp: Time.now
)
  @records = Array(records)
  @first_offset = first_offset
  @first_timestamp = first_timestamp
  @codec_id = codec_id

  # Records verification
  @last_offset_delta = last_offset_delta
  @max_timestamp = max_timestamp

  # Transaction information
  @producer_id = producer_id
  @producer_epoch = producer_epoch

  @first_sequence = first_sequence
  @partition_leader_epoch = partition_leader_epoch
  @in_transaction = in_transaction
  @is_control_batch = is_control_batch

  mark_control_record
end

Instance Attribute Details

#codec_idObject

Returns the value of attribute codec_id


19
20
21
# File 'lib/kafka/protocol/record_batch.rb', line 19

def codec_id
  @codec_id
end

#first_offsetObject (readonly)

Returns the value of attribute first_offset


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_offset
  @first_offset
end

#first_sequenceObject (readonly)

Returns the value of attribute first_sequence


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_sequence
  @first_sequence
end

#first_timestampObject (readonly)

Returns the value of attribute first_timestamp


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def first_timestamp
  @first_timestamp
end

#in_transactionObject (readonly)

Returns the value of attribute in_transaction


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def in_transaction
  @in_transaction
end

#is_control_batchObject (readonly)

Returns the value of attribute is_control_batch


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def is_control_batch
  @is_control_batch
end

#last_offset_deltaObject (readonly)

Returns the value of attribute last_offset_delta


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def last_offset_delta
  @last_offset_delta
end

#max_timestampObject (readonly)

Returns the value of attribute max_timestamp


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def max_timestamp
  @max_timestamp
end

#partition_leader_epochObject (readonly)

Returns the value of attribute partition_leader_epoch


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def partition_leader_epoch
  @partition_leader_epoch
end

#producer_epochObject (readonly)

Returns the value of attribute producer_epoch


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def producer_epoch
  @producer_epoch
end

#producer_idObject (readonly)

Returns the value of attribute producer_id


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def producer_id
  @producer_id
end

#recordsObject (readonly)

Returns the value of attribute records


17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def records
  @records
end

Class Method Details

.decode(decoder) ⇒ Object


152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/kafka/protocol/record_batch.rb', line 152

def self.decode(decoder)
  first_offset = decoder.int64

  record_batch_raw = decoder.bytes
  record_batch_decoder = Decoder.from_string(record_batch_raw)

  partition_leader_epoch = record_batch_decoder.int32
  # Currently, the magic byte is used to distingush legacy MessageSet and
  # RecordBatch. Therefore, we don't care about magic byte here yet.
  _magic_byte = record_batch_decoder.int8
  _crc = record_batch_decoder.int32

  attributes = record_batch_decoder.int16
  codec_id = attributes & CODEC_ID_MASK
  in_transaction = (attributes & IN_TRANSACTION_MASK) > 0
  is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0
  log_append_time = (attributes & TIMESTAMP_TYPE_MASK) != 0

  last_offset_delta = record_batch_decoder.int32
  first_timestamp = Time.at(record_batch_decoder.int64 / BigDecimal(1000))
  max_timestamp = Time.at(record_batch_decoder.int64 / BigDecimal(1000))

  producer_id = record_batch_decoder.int64
  producer_epoch = record_batch_decoder.int16
  first_sequence = record_batch_decoder.int32

  records_array_length = record_batch_decoder.int32
  records_array_raw = record_batch_decoder.read(
    record_batch_raw.size - RECORD_BATCH_OVERHEAD
  )
  if codec_id != 0
    codec = Compression.find_codec_by_id(codec_id)
    records_array_raw = codec.decompress(records_array_raw)
  end

  records_array_decoder = Decoder.from_string(records_array_raw)
  records_array = []
  until records_array_decoder.eof?
    record = Record.decode(records_array_decoder)
    record.offset = first_offset + record.offset_delta
    record.create_time = log_append_time && max_timestamp ? max_timestamp : first_timestamp + record.timestamp_delta / BigDecimal(1000)
    records_array << record
  end

  raise InsufficientDataMessage if records_array.length != records_array_length

  new(
    records: records_array,
    first_offset: first_offset,
    first_timestamp: first_timestamp,
    partition_leader_epoch: partition_leader_epoch,
    in_transaction: in_transaction,
    is_control_batch: is_control_batch,
    last_offset_delta: last_offset_delta,
    producer_id: producer_id,
    producer_epoch: producer_epoch,
    first_sequence: first_sequence,
    max_timestamp: max_timestamp
  )
rescue EOFError
  raise InsufficientDataMessage, 'Partial trailing record detected!'
end

Instance Method Details

#==(other) ⇒ Object


140
141
142
143
144
145
146
147
148
149
150
# File 'lib/kafka/protocol/record_batch.rb', line 140

def ==(other)
  records == other.records &&
    first_offset == other.first_offset &&
    partition_leader_epoch == other.partition_leader_epoch &&
    in_transaction == other.in_transaction &&
    is_control_batch == other.is_control_batch &&
    last_offset_delta == other.last_offset_delta &&
    producer_id == other.producer_id &&
    producer_epoch == other.producer_epoch &&
    first_sequence == other.first_sequence
end

#attributesObject


64
65
66
67
68
# File 'lib/kafka/protocol/record_batch.rb', line 64

def attributes
  0x0000 | @codec_id |
    (@in_transaction ? IN_TRANSACTION_MASK : 0x0) |
    (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0)
end

#compressed?Boolean

Returns:

  • (Boolean)

122
123
124
# File 'lib/kafka/protocol/record_batch.rb', line 122

def compressed?
  @codec_id != 0
end

#encode(encoder) ⇒ Object


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kafka/protocol/record_batch.rb', line 70

def encode(encoder)
  encoder.write_int64(@first_offset)

  record_batch_buffer = StringIO.new
  record_batch_encoder = Encoder.new(record_batch_buffer)

  record_batch_encoder.write_int32(@partition_leader_epoch)
  record_batch_encoder.write_int8(MAGIC_BYTE)

  body = encode_record_batch_body
  crc = Digest::CRC32c.checksum(body)

  record_batch_encoder.write_int32(crc)
  record_batch_encoder.write(body)

  encoder.write_bytes(record_batch_buffer.string)
end

#encode_record_arrayObject


113
114
115
116
117
118
119
120
# File 'lib/kafka/protocol/record_batch.rb', line 113

def encode_record_array
  buffer = StringIO.new
  encoder = Encoder.new(buffer)
  @records.each do |record|
    record.encode(encoder)
  end
  buffer.string
end

#encode_record_batch_bodyObject


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kafka/protocol/record_batch.rb', line 88

def encode_record_batch_body
  buffer = StringIO.new
  encoder = Encoder.new(buffer)

  encoder.write_int16(attributes)
  encoder.write_int32(@last_offset_delta)
  encoder.write_int64((@first_timestamp.to_f * 1000).to_i)
  encoder.write_int64((@max_timestamp.to_f * 1000).to_i)

  encoder.write_int64(@producer_id)
  encoder.write_int16(@producer_epoch)
  encoder.write_int32(@first_sequence)

  encoder.write_int32(@records.length)

  records_array = encode_record_array
  if compressed?
    codec = Compression.find_codec_by_id(@codec_id)
    records_array = codec.compress(records_array)
  end
  encoder.write(records_array)

  buffer.string
end

#fulfill_relative_dataObject


126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/kafka/protocol/record_batch.rb', line 126

def fulfill_relative_data
  first_record = records.min_by { |record| record.create_time }
  @first_timestamp = first_record.nil? ? Time.now : first_record.create_time

  last_record = records.max_by { |record| record.create_time }
  @max_timestamp = last_record.nil? ? Time.now : last_record.create_time

  records.each_with_index do |record, index|
    record.offset_delta = index
    record.timestamp_delta = ((record.create_time - first_timestamp) * 1000).to_i
  end
  @last_offset_delta = records.length - 1
end

#last_offsetObject


60
61
62
# File 'lib/kafka/protocol/record_batch.rb', line 60

def last_offset
  @first_offset + @last_offset_delta
end

#mark_control_recordObject


215
216
217
218
219
220
# File 'lib/kafka/protocol/record_batch.rb', line 215

def mark_control_record
  if in_transaction && is_control_batch
    record = @records.first
    record.is_control_record = true unless record.nil?
  end
end

#sizeObject


56
57
58
# File 'lib/kafka/protocol/record_batch.rb', line 56

def size
  @records.size
end