Class: Kazoo::ReplicaAssigner
- Inherits:
-
Object
- Object
- Kazoo::ReplicaAssigner
- Defined in:
- lib/kazoo/replica_assigner.rb
Overview
Helper class to assign replicas to brokers for new partitions. It tries to e=venly divide the number of leaders and replicas over the brokers, in order to get a comparable load on all the brokers in the cluster.
Instance Attribute Summary collapse
-
#broker_leaders ⇒ Object
readonly
Returns the value of attribute broker_leaders.
-
#broker_replicas ⇒ Object
readonly
Returns the value of attribute broker_replicas.
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
Instance Method Summary collapse
- #assign(replication_factor) ⇒ Object
- #brokers ⇒ Object
- #cluster_leader_count ⇒ Object
- #cluster_replica_count ⇒ Object
-
#initialize(cluster) ⇒ ReplicaAssigner
constructor
A new instance of ReplicaAssigner.
- #retrieve_initial_counts ⇒ Object
Constructor Details
#initialize(cluster) ⇒ ReplicaAssigner
Returns a new instance of ReplicaAssigner.
9 10 11 12 |
# File 'lib/kazoo/replica_assigner.rb', line 9 def initialize(cluster) @cluster = cluster retrieve_initial_counts end |
Instance Attribute Details
#broker_leaders ⇒ Object (readonly)
Returns the value of attribute broker_leaders.
7 8 9 |
# File 'lib/kazoo/replica_assigner.rb', line 7 def broker_leaders @broker_leaders end |
#broker_replicas ⇒ Object (readonly)
Returns the value of attribute broker_replicas.
7 8 9 |
# File 'lib/kazoo/replica_assigner.rb', line 7 def broker_replicas @broker_replicas end |
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
6 7 8 |
# File 'lib/kazoo/replica_assigner.rb', line 6 def cluster @cluster end |
Instance Method Details
#assign(replication_factor) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/kazoo/replica_assigner.rb', line 41 def assign(replication_factor) raise Kazoo::ValidationError, "replication_factor should be higher than 0 " if replication_factor <= 0 raise Kazoo::ValidationError, "replication_factor should not be higher than the number of brokers " if replication_factor > brokers.length # Order all brokers by the current number of leaders (ascending). # The first one will be the leader replica leader = @broker_leaders .to_a .sort_by { |pair| [pair[1], pair[0].id] } .map(&:first) .first # Update the current broker replica counts. # The assigned leader replica counts as a leader, but as a replica as well. @broker_leaders[leader] += 1 @broker_replicas[leader] += 1 # To assign the other replcias, we remove the broker that was selected as leader from # the list of brokers, and sort the rest by the number of replicas they are currently hosting. # Then, we take the number of remaining replcias to complete the replication factor. other_replicas = @broker_replicas .to_a .reject { |(key, _)| key == leader } .sort_by { |pair| [pair[1], pair[0].id] } .map(&:first) .take(replication_factor - 1) # Update the current broker replica counts. other_replicas.each { |broker| @broker_replicas[broker] += 1 } [leader].concat(other_replicas) end |
#brokers ⇒ Object
14 15 16 |
# File 'lib/kazoo/replica_assigner.rb', line 14 def brokers @cluster.brokers end |
#cluster_leader_count ⇒ Object
33 34 35 |
# File 'lib/kazoo/replica_assigner.rb', line 33 def cluster_leader_count broker_leaders.values.inject(0, &:+) end |
#cluster_replica_count ⇒ Object
37 38 39 |
# File 'lib/kazoo/replica_assigner.rb', line 37 def cluster_replica_count broker_replicas.values.inject(0, &:+) end |
#retrieve_initial_counts ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/kazoo/replica_assigner.rb', line 18 def retrieve_initial_counts @broker_leaders, @broker_replicas = {}, {} @cluster.brokers.each do |_, broker| @broker_leaders[broker], @broker_replicas[broker] = 0, 0 end cluster.partitions.each do |partition| @broker_leaders[partition.preferred_leader] += 1 partition.replicas.each do |broker| @broker_replicas[broker] += 1 end end end |