Class: Kafka::ConsumerGroup::Assignor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer_group/assignor.rb

Overview

A consumer group partition assignor

Defined Under Namespace

Classes: Partition

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, strategy:) ⇒ Assignor



15
16
17
18
# File 'lib/kafka/consumer_group/assignor.rb', line 15

def initialize(cluster:, strategy:)
  @cluster = cluster
  @strategy = strategy
end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>

Assign the topic partitions to the group members.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/kafka/consumer_group/assignor.rb', line 35

def assign(members:, topics:)
  topic_partitions = topics.flat_map do |topic|
    begin
      partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
  end

  group_assignment = {}

  members.each_key do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end
  @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
    Array(partitions).each do |partition|
      group_assignment[member_id].assign(partition.topic, [partition.partition_id])
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end

#protocol_nameObject



20
21
22
# File 'lib/kafka/consumer_group/assignor.rb', line 20

def protocol_name
  @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
end

#user_dataObject



24
25
26
# File 'lib/kafka/consumer_group/assignor.rb', line 24

def user_data
  @strategy.user_data if @strategy.respond_to?(:user_data)
end