Class: Kafka::RoundRobinAssignmentStrategy
- Inherits:
-
Object
- Object
- Kafka::RoundRobinAssignmentStrategy
- Defined in:
- lib/kafka/round_robin_assignment_strategy.rb
Overview
A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.
Instance Method Summary collapse
-
#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>
Assign the topic partitions to the group members.
-
#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy
constructor
A new instance of RoundRobinAssignmentStrategy.
Constructor Details
#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy
Returns a new instance of RoundRobinAssignmentStrategy.
8 9 10 |
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 8 def initialize(cluster:) @cluster = cluster end |
Instance Method Details
#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>
Assign the topic partitions to the group members.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 18 def assign(members:, topics:) group_assignment = {} members.each do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end topics.each do |topic| partitions = @cluster.partitions_for(topic).map(&:partition_id) partitions_per_member = partitions.group_by {|partition_id| partition_id % members.count }.values members.zip(partitions_per_member).each do |member_id, member_partitions| unless member_partitions.nil? group_assignment[member_id].assign(topic, member_partitions) end end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end |