Class: Kafka::Partitioner

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/partitioner.rb

Overview

Assigns partitions to messages.

Instance Method Summary collapse

Constructor Details

#initialize(hash_function: nil) ⇒ Partitioner

Returns a new instance of Partitioner.

Parameters:

  • hash_function (Symbol, nil) (defaults to: nil)

    the algorithm used to compute a messages destination partition. Default is :crc32



11
12
13
# File 'lib/kafka/partitioner.rb', line 11

def initialize(hash_function: nil)
  @digest = Digest.find_digest(hash_function || :crc32)
end

Instance Method Details

#call(partition_count, message) ⇒ Integer

Assigns a partition number based on a partition key. If no explicit partition key is provided, the message key will be used instead.

If the key is nil, then a random partition is selected. Otherwise, a digest of the key is used to deterministically find a partition. As long as the number of partitions doesn't change, the same key will always be assigned to the same partition.

Parameters:

  • partition_count (Integer)

    the number of partitions in the topic.

  • message (Kafka::PendingMessage)

    the message that should be assigned a partition.

Returns:

  • (Integer)

    the partition number.

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/kafka/partitioner.rb', line 27

def call(partition_count, message)
  raise ArgumentError if partition_count == 0

  # If no explicit partition key is specified we use the message key instead.
  key = message.partition_key || message.key

  if key.nil?
    rand(partition_count)
  else
    @digest.hash(key) % partition_count
  end
end