Class: Kafka::RoundRobinAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/round_robin_assignment_strategy.rb

Overview

A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy

Returns a new instance of RoundRobinAssignmentStrategy.



8
9
10
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 8

def initialize(cluster:)
  @cluster = cluster
end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>

Assign the topic partitions to the group members.

Parameters:

  • members (Array<String>)

    member ids

  • topics (Array<String>)

    topics

Returns:



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 18

def assign(members:, topics:)
  group_assignment = {}

  members.each do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end

  topics.each do |topic|
    begin
      partitions = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end

    partitions_per_member = partitions.group_by {|partition_id|
      partition_id % members.count
    }.values

    members.zip(partitions_per_member).each do |member_id, member_partitions|
      unless member_partitions.nil?
        group_assignment[member_id].assign(topic, member_partitions)
      end
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end