Class: Kafka::RoundRobinAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/round_robin_assignment_strategy.rb

Overview

A round robin assignment strategy inpired on the original java client round robin assignor. It's capable of handling identical as well as different topic subscriptions accross the same consumer group.

Instance Method Summary collapse

Instance Method Details

#call(cluster:, members:, partitions:) ⇒ Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash mapping member ids to partitions.

Assign the topic partitions to the group members.

Parameters:

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 21

def call(cluster:, members:, partitions:)
  partitions_per_member = Hash.new {|h, k| h[k] = [] }
  relevant_partitions = valid_sorted_partitions(members, partitions)
  members_ids = members.keys
  iterator = (0...members.size).cycle
  idx = iterator.next

  relevant_partitions.each do |partition|
    topic = partition.topic

    while !members[members_ids[idx]].topics.include?(topic)
      idx = iterator.next
    end

    partitions_per_member[members_ids[idx]] << partition
    idx = iterator.next
  end

  partitions_per_member
end

#protocol_nameObject



8
9
10
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 8

def protocol_name
  "roundrobin"
end

#valid_sorted_partitions(members, partitions) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 42

def valid_sorted_partitions(members, partitions)
  subscribed_topics = members.map do |id, |
     && .topics
  end.flatten.compact

  partitions
    .select { |partition| subscribed_topics.include?(partition.topic) }
    .sort_by { |partition| partition.topic }
end