Class: Ktl::DecommissionPlan
- Inherits:
-
Object
- Object
- Ktl::DecommissionPlan
- Defined in:
- lib/ktl/decommission_plan.rb
Instance Method Summary collapse
- #generate ⇒ Object
-
#initialize(zk_client, broker_id) ⇒ DecommissionPlan
constructor
A new instance of DecommissionPlan.
Constructor Details
#initialize(zk_client, broker_id) ⇒ DecommissionPlan
Returns a new instance of DecommissionPlan.
5 6 7 8 9 10 |
# File 'lib/ktl/decommission_plan.rb', line 5 def initialize(zk_client, broker_id) @zk_client = zk_client @broker_id = broker_id @replicas_count = Hash.new(0) @leaders_count = Hash.new(0) end |
Instance Method Details
#generate ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/ktl/decommission_plan.rb', line 12 def generate plan = Scala::Collection::Map.empty brokers = @zk_client.broker_ids brokers = brokers - @broker_id partitions = @zk_client.all_partitions topics = topics_from(partitions) assignments = @zk_client.replica_assignment_for_topics(topics) count_leaders_and_replicas(assignments) partitions = ScalaEnumerable.new(partitions).sort_by { |tp| tp.topic + tp.partition.to_s } partitions.each do |tp| replicas = assignments[tp] if replicas.contains?(@broker_id) if brokers.size >= replicas.size brokers_diff = ScalaEnumerable.new(brokers.diff(replicas)).sort broker_index = replicas.index_of(@broker_id) new_broker = elect_new_broker(broker_index, brokers_diff) new_replicas = replicas.updated(broker_index, new_broker, CanBuildFrom) plan += Scala::Tuple.new(tp, new_replicas) else raise InsufficientBrokersRemainingError, %(#{brokers.size} remaining brokers, #{replicas.size} replicas needed) end end end plan end |