Class: Kafka::RoundRobinAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/round_robin_assignment_strategy.rb

Overview

A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy



10
11
12
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 10

def initialize(cluster:)
  @cluster = cluster
end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>

Assign the topic partitions to the group members.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 20

def assign(members:, topics:)
  group_assignment = {}

  members.each do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end

  topic_partitions = topics.flat_map do |topic|
    begin
      partitions = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    Array.new(partitions.count) { topic }.zip(partitions)
  end

  partitions_per_member = topic_partitions.group_by.with_index do |_, index|
    index % members.count
  end.values

  members.zip(partitions_per_member).each do |member_id, member_partitions|
    unless member_partitions.nil?
      member_partitions.each do |topic, partition|
        group_assignment[member_id].assign(topic, [partition])
      end
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end