Class: Kafka::Compressor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/compressor.rb

Overview

Compresses message sets using a specified codec.

A message set is only compressed if its size meets the defined threshold.

Instrumentation

Whenever a message set is compressed, the notification compress.compressor.kafka will be emitted with the following payload:

  • message_count – the number of messages in the message set.
  • uncompressed_bytesize – the byte size of the original data.
  • compressed_bytesize – the byte size of the compressed data.

Instance Method Summary collapse

Constructor Details

#initialize(codec_name: nil, threshold: 1, instrumenter:) ⇒ Compressor

Returns a new instance of Compressor.

Parameters:

  • codec_name (Symbol, nil) (defaults to: nil)
  • threshold (Integer) (defaults to: 1)

    the minimum number of messages in a message set that will trigger compression.



23
24
25
26
27
# File 'lib/kafka/compressor.rb', line 23

def initialize(codec_name: nil, threshold: 1, instrumenter:)
  @codec = Compression.find_codec(codec_name)
  @threshold = threshold
  @instrumenter = instrumenter
end

Instance Method Details

#compress(message_set) ⇒ Protocol::MessageSet

Parameters:

Returns:



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kafka/compressor.rb', line 31

def compress(message_set)
  return message_set if @codec.nil? || message_set.size < @threshold

  compressed_data = compress_data(message_set)

  wrapper_message = Protocol::Message.new(
    value: compressed_data,
    codec_id: @codec.codec_id,
  )

  Protocol::MessageSet.new(messages: [wrapper_message])
end