Class: Kafka::Protocol::RecordBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/record_batch.rb

Constant Summary collapse

MAGIC_BYTE =
2
RECORD_BATCH_OVERHEAD =

The size of metadata before the real record data

49
CODEC_ID_MASK =

Masks to extract information from attributes

0b00000111
IN_TRANSACTION_MASK =
0b00010000
IS_CONTROL_BATCH_MASK =
0b00100000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(records: [], first_offset: 0, first_timestamp: Time.now, partition_leader_epoch: 0, codec_id: 0, in_transaction: false, is_control_batch: false, last_offset_delta: 0, producer_id: -1,, producer_epoch: 0, first_sequence: 0, max_timestamp: Time.now) ⇒ RecordBatch

Returns a new instance of RecordBatch.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/protocol/record_batch.rb', line 19

def initialize(
    records: [],
    first_offset: 0,
    first_timestamp: Time.now,
    partition_leader_epoch: 0,
    codec_id: 0,
    in_transaction: false,
    is_control_batch: false,
    last_offset_delta: 0,
    producer_id: -1,
    producer_epoch: 0,
    first_sequence: 0,
    max_timestamp: Time.now
)
  @records = Array(records)
  @first_offset = first_offset
  @first_timestamp = first_timestamp
  @codec_id = codec_id

  # Records verification
  @last_offset_delta = last_offset_delta
  @max_timestamp = max_timestamp

  # Transaction information
  @producer_id = producer_id
  @producer_epoch = producer_epoch

  @first_sequence = first_sequence
  @partition_leader_epoch = partition_leader_epoch
  @in_transaction = in_transaction
  @is_control_batch = is_control_batch

  mark_control_record
end

Instance Attribute Details

#codec_idObject

Returns the value of attribute codec_id.



17
18
19
# File 'lib/kafka/protocol/record_batch.rb', line 17

def codec_id
  @codec_id
end

#first_offsetObject (readonly)

Returns the value of attribute first_offset.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def first_offset
  @first_offset
end

#first_sequenceObject (readonly)

Returns the value of attribute first_sequence.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def first_sequence
  @first_sequence
end

#first_timestampObject (readonly)

Returns the value of attribute first_timestamp.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def first_timestamp
  @first_timestamp
end

#in_transactionObject (readonly)

Returns the value of attribute in_transaction.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def in_transaction
  @in_transaction
end

#is_control_batchObject (readonly)

Returns the value of attribute is_control_batch.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def is_control_batch
  @is_control_batch
end

#last_offset_deltaObject (readonly)

Returns the value of attribute last_offset_delta.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def last_offset_delta
  @last_offset_delta
end

#max_timestampObject (readonly)

Returns the value of attribute max_timestamp.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def max_timestamp
  @max_timestamp
end

#partition_leader_epochObject (readonly)

Returns the value of attribute partition_leader_epoch.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def partition_leader_epoch
  @partition_leader_epoch
end

#producer_epochObject (readonly)

Returns the value of attribute producer_epoch.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def producer_epoch
  @producer_epoch
end

#producer_idObject (readonly)

Returns the value of attribute producer_id.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def producer_id
  @producer_id
end

#recordsObject (readonly)

Returns the value of attribute records.



15
16
17
# File 'lib/kafka/protocol/record_batch.rb', line 15

def records
  @records
end

Class Method Details

.decode(decoder) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/kafka/protocol/record_batch.rb', line 150

def self.decode(decoder)
  first_offset = decoder.int64

  record_batch_raw = decoder.bytes
  record_batch_decoder = Decoder.from_string(record_batch_raw)

  partition_leader_epoch = record_batch_decoder.int32
  # Currently, the magic byte is used to distingush legacy MessageSet and
  # RecordBatch. Therefore, we don't care about magic byte here yet.
  _magic_byte = record_batch_decoder.int8
  _crc = record_batch_decoder.int32

  attributes = record_batch_decoder.int16
  codec_id = attributes & CODEC_ID_MASK
  in_transaction = (attributes & IN_TRANSACTION_MASK) > 0
  is_control_batch = (attributes & IS_CONTROL_BATCH_MASK) > 0

  last_offset_delta = record_batch_decoder.int32
  first_timestamp = Time.at(record_batch_decoder.int64 / 1000)
  max_timestamp = Time.at(record_batch_decoder.int64 / 1000)

  producer_id = record_batch_decoder.int64
  producer_epoch = record_batch_decoder.int16
  first_sequence = record_batch_decoder.int32

  records_array_length = record_batch_decoder.int32
  records_array_raw = record_batch_decoder.read(
    record_batch_raw.size - RECORD_BATCH_OVERHEAD
  )
  if codec_id != 0
    codec = Compression.find_codec_by_id(codec_id)
    records_array_raw = codec.decompress(records_array_raw)
  end

  records_array_decoder = Decoder.from_string(records_array_raw)
  records_array = []
  until records_array_decoder.eof?
    record = Record.decode(records_array_decoder)
    record.offset = first_offset + record.offset_delta
    record.create_time = first_timestamp + record.timestamp_delta
    records_array << record
  end

  raise InsufficientDataMessage if records_array.length != records_array_length

  new(
    records: records_array,
    first_offset: first_offset,
    first_timestamp: first_timestamp,
    partition_leader_epoch: partition_leader_epoch,
    in_transaction: in_transaction,
    is_control_batch: is_control_batch,
    last_offset_delta: last_offset_delta,
    producer_id: producer_id,
    producer_epoch: producer_epoch,
    first_sequence: first_sequence,
    max_timestamp: max_timestamp
  )
rescue EOFError
  raise InsufficientDataMessage, 'Partial trailing record detected!'
end

Instance Method Details

#==(other) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/kafka/protocol/record_batch.rb', line 138

def ==(other)
  records == other.records &&
    first_offset == other.first_offset &&
    partition_leader_epoch == other.partition_leader_epoch &&
    in_transaction == other.in_transaction &&
    is_control_batch == other.is_control_batch &&
    last_offset_delta == other.last_offset_delta &&
    producer_id == other.producer_id &&
    producer_epoch == other.producer_epoch &&
    first_sequence == other.first_sequence
end

#attributesObject



62
63
64
65
66
# File 'lib/kafka/protocol/record_batch.rb', line 62

def attributes
  0x0000 | @codec_id |
    (@in_transaction ? IN_TRANSACTION_MASK : 0x0) |
    (@is_control_batch ? IS_CONTROL_BATCH_MASK : 0x0)
end

#compressed?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/kafka/protocol/record_batch.rb', line 120

def compressed?
  @codec_id != 0
end

#encode(encoder) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/kafka/protocol/record_batch.rb', line 68

def encode(encoder)
  encoder.write_int64(@first_offset)

  record_batch_buffer = StringIO.new
  record_batch_encoder = Encoder.new(record_batch_buffer)

  record_batch_encoder.write_int32(@partition_leader_epoch)
  record_batch_encoder.write_int8(MAGIC_BYTE)

  body = encode_record_batch_body
  crc = Digest::CRC32c.checksum(body)

  record_batch_encoder.write_int32(crc)
  record_batch_encoder.write(body)

  encoder.write_bytes(record_batch_buffer.string)
end

#encode_record_arrayObject



111
112
113
114
115
116
117
118
# File 'lib/kafka/protocol/record_batch.rb', line 111

def encode_record_array
  buffer = StringIO.new
  encoder = Encoder.new(buffer)
  @records.each do |record|
    record.encode(encoder)
  end
  buffer.string
end

#encode_record_batch_bodyObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/kafka/protocol/record_batch.rb', line 86

def encode_record_batch_body
  buffer = StringIO.new
  encoder = Encoder.new(buffer)

  encoder.write_int16(attributes)
  encoder.write_int32(@last_offset_delta)
  encoder.write_int64((@first_timestamp.to_f * 1000).to_i)
  encoder.write_int64((@max_timestamp.to_f * 1000).to_i)

  encoder.write_int64(@producer_id)
  encoder.write_int16(@producer_epoch)
  encoder.write_int32(@first_sequence)

  encoder.write_int32(@records.length)

  records_array = encode_record_array
  if compressed?
    codec = Compression.find_codec_by_id(@codec_id)
    records_array = codec.compress(records_array)
  end
  encoder.write(records_array)

  buffer.string
end

#fulfill_relative_dataObject



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/kafka/protocol/record_batch.rb', line 124

def fulfill_relative_data
  first_record = records.min_by { |record| record.create_time }
  @first_timestamp = first_record.nil? ? Time.now : first_record.create_time

  last_record = records.max_by { |record| record.create_time }
  @max_timestamp = last_record.nil? ? Time.now : last_record.create_time

  records.each_with_index do |record, index|
    record.offset_delta = index
    record.timestamp_delta = (record.create_time - first_timestamp).to_i
  end
  @last_offset_delta = records.length - 1
end

#last_offsetObject



58
59
60
# File 'lib/kafka/protocol/record_batch.rb', line 58

def last_offset
  @first_offset + @last_offset_delta
end

#mark_control_recordObject



212
213
214
215
216
217
# File 'lib/kafka/protocol/record_batch.rb', line 212

def mark_control_record
  if in_transaction && is_control_batch
    record = @records.first
    record.is_control_record = true unless record.nil?
  end
end

#sizeObject



54
55
56
# File 'lib/kafka/protocol/record_batch.rb', line 54

def size
  @records.size
end