Class: Kafka::Compressor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/compressor.rb

Overview

Compresses message sets using a specified codec.

A message set is only compressed if its size meets the defined threshold.

Instrumentation

Whenever a message set is compressed, the notification compress.compressor.kafka will be emitted with the following payload:

  • message_count – the number of messages in the message set.
  • uncompressed_bytesize – the byte size of the original data.
  • compressed_bytesize – the byte size of the compressed data.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(codec_name: nil, threshold: 1, instrumenter:) ⇒ Compressor

Returns a new instance of Compressor.

Parameters:

  • codec_name (Symbol, nil) (defaults to: nil)
  • threshold (Integer) (defaults to: 1)

    the minimum number of messages in a message set that will trigger compression.



26
27
28
29
30
31
32
# File 'lib/kafka/compressor.rb', line 26

def initialize(codec_name: nil, threshold: 1, instrumenter:)
  # Codec may be nil, in which case we won't compress.
  @codec = codec_name && Compression.find_codec(codec_name)

  @threshold = threshold
  @instrumenter = instrumenter
end

Instance Attribute Details

#codecObject (readonly)

Returns the value of attribute codec.



21
22
23
# File 'lib/kafka/compressor.rb', line 21

def codec
  @codec
end

Instance Method Details

#compress(record_batch, offset: -1)) ⇒ Protocol::RecordBatch

Parameters:

  • record_batch (Protocol::RecordBatch)
  • offset (Integer) (defaults to: -1))

    used to simulate broker behaviour in tests

Returns:



37
38
39
40
41
42
43
44
# File 'lib/kafka/compressor.rb', line 37

def compress(record_batch, offset: -1)
  if record_batch.is_a?(Protocol::RecordBatch)
    compress_record_batch(record_batch)
  else
    # Deprecated message set format
    compress_message_set(record_batch, offset)
  end
end