Class: Kafka::RoundRobinAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/round_robin_assignment_strategy.rb

Overview

A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy

Returns a new instance of RoundRobinAssignmentStrategy


10
11
12
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 10

def initialize(cluster:)
  @cluster = cluster
end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>

Assign the topic partitions to the group members.

Parameters:

  • members (Array<String>)

    member ids

  • topics (Array<String>)

    topics

Returns:


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 20

def assign(members:, topics:)
  group_assignment = {}

  members.each do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end

  topic_partitions = topics.flat_map do |topic|
    begin
      partitions = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    Array.new(partitions.count) { topic }.zip(partitions)
  end

  partitions_per_member = topic_partitions.group_by.with_index do |_, index|
    index % members.count
  end.values

  members.zip(partitions_per_member).each do |member_id, member_partitions|
    unless member_partitions.nil?
      member_partitions.each do |topic, partition|
        group_assignment[member_id].assign(topic, [partition])
      end
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end