Class: Kafka::Compressor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/compressor.rb

Overview

Compresses message sets using a specified codec.

A message set is only compressed if its size meets the defined threshold.

Instrumentation

Whenever a message set is compressed, the notification compress.compressor.kafka will be emitted with the following payload:

  • message_count – the number of messages in the message set.
  • uncompressed_bytesize – the byte size of the original data.
  • compressed_bytesize – the byte size of the compressed data.

Instance Method Summary collapse

Constructor Details

#initialize(codec_name: nil, threshold: 1, instrumenter:) ⇒ Compressor

Returns a new instance of Compressor.

Parameters:

  • codec_name (Symbol, nil) (defaults to: nil)
  • threshold (Integer) (defaults to: 1)

    the minimum number of messages in a message set that will trigger compression.



25
26
27
28
29
30
31
# File 'lib/kafka/compressor.rb', line 25

def initialize(codec_name: nil, threshold: 1, instrumenter:)
  # Codec may be nil, in which case we won't compress.
  @codec = codec_name && Compression.find_codec(codec_name)

  @threshold = threshold
  @instrumenter = instrumenter
end

Instance Method Details

#compress(message_set, offset: -1)) ⇒ Protocol::MessageSet

Parameters:

  • message_set (Protocol::MessageSet)
  • offset (Integer) (defaults to: -1))

    used to simulate broker behaviour in tests

Returns:



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/kafka/compressor.rb', line 36

def compress(message_set, offset: -1)
  return message_set if @codec.nil? || message_set.size < @threshold

  compressed_data = compress_data(message_set)

  wrapper_message = Protocol::Message.new(
    value: compressed_data,
    codec_id: @codec.codec_id,
    offset: offset
  )

  Protocol::MessageSet.new(messages: [wrapper_message])
end