Class: Kazoo::ReplicaAssigner

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/replica_assigner.rb

Overview

Helper class to assign replicas to brokers for new partitions. It tries to e=venly divide the number of leaders and replicas over the brokers, in order to get a comparable load on all the brokers in the cluster.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster) ⇒ ReplicaAssigner

Returns a new instance of ReplicaAssigner.



9
10
11
12
# File 'lib/kazoo/replica_assigner.rb', line 9

def initialize(cluster)
  @cluster = cluster
  retrieve_initial_counts
end

Instance Attribute Details

#broker_leadersObject (readonly)

Returns the value of attribute broker_leaders.



7
8
9
# File 'lib/kazoo/replica_assigner.rb', line 7

def broker_leaders
  @broker_leaders
end

#broker_replicasObject (readonly)

Returns the value of attribute broker_replicas.



7
8
9
# File 'lib/kazoo/replica_assigner.rb', line 7

def broker_replicas
  @broker_replicas
end

#clusterObject (readonly)

Returns the value of attribute cluster.



6
7
8
# File 'lib/kazoo/replica_assigner.rb', line 6

def cluster
  @cluster
end

Instance Method Details

#assign(replication_factor) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/kazoo/replica_assigner.rb', line 41

def assign(replication_factor)
  raise Kazoo::ValidationError, "replication_factor should be higher than 0 " if replication_factor <= 0
  raise Kazoo::ValidationError, "replication_factor should not be higher than the number of brokers " if replication_factor > brokers.length

  # Order all brokers by the current number of leaders (ascending).
  # The first one will be the leader replica
  leader = @broker_leaders
    .to_a
    .sort_by { |pair| [pair[1], pair[0].id] }
    .map(&:first)
    .first

  # Update the current broker replica counts.
  # The assigned leader replica counts as a leader, but as a replica as well.
  @broker_leaders[leader] += 1
  @broker_replicas[leader] += 1

  # To assign the other replcias, we remove the broker that was selected as leader from
  # the list of brokers, and sort the rest by the number of replicas they are currently hosting.
  # Then, we take the number of remaining replcias to complete the replication factor.
  other_replicas = @broker_replicas
    .to_a
    .reject { |(key, _)| key == leader }
    .sort_by { |pair| [pair[1], pair[0].id] }
    .map(&:first)
    .take(replication_factor - 1)

  # Update the current broker replica counts.
  other_replicas.each { |broker| @broker_replicas[broker] += 1 }

  [leader].concat(other_replicas)
end

#brokersObject



14
15
16
# File 'lib/kazoo/replica_assigner.rb', line 14

def brokers
  @cluster.brokers
end

#cluster_leader_countObject



33
34
35
# File 'lib/kazoo/replica_assigner.rb', line 33

def cluster_leader_count
  broker_leaders.values.inject(0, &:+)
end

#cluster_replica_countObject



37
38
39
# File 'lib/kazoo/replica_assigner.rb', line 37

def cluster_replica_count
  broker_replicas.values.inject(0, &:+)
end

#retrieve_initial_countsObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kazoo/replica_assigner.rb', line 18

def retrieve_initial_counts
  @broker_leaders, @broker_replicas = {}, {}

  @cluster.brokers.each do |_, broker|
    @broker_leaders[broker], @broker_replicas[broker] = 0, 0
  end

  cluster.partitions.each do |partition|
    @broker_leaders[partition.preferred_leader] += 1
    partition.replicas.each do |broker|
      @broker_replicas[broker] += 1
    end
  end
end