Class: Ktl::DecommissionPlan

Inherits:
Object
  • Object
show all
Defined in:
lib/ktl/decommission_plan.rb

Instance Method Summary collapse

Constructor Details

#initialize(zk_client, broker_id) ⇒ DecommissionPlan

Returns a new instance of DecommissionPlan.



5
6
7
8
9
10
# File 'lib/ktl/decommission_plan.rb', line 5

def initialize(zk_client, broker_id)
  @zk_client = zk_client
  @broker_id = broker_id
  @replicas_count = Hash.new(0)
  @leaders_count = Hash.new(0)
end

Instance Method Details

#generateObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ktl/decommission_plan.rb', line 12

def generate
  plan = Scala::Collection::Map.empty
  brokers = @zk_client.broker_ids
  brokers = brokers - @broker_id
  partitions = @zk_client.all_partitions
  topics = topics_from(partitions)
  assignments = @zk_client.replica_assignment_for_topics(topics)
  count_leaders_and_replicas(assignments)
  partitions = ScalaEnumerable.new(partitions).sort_by { |tp| tp.topic + tp.partition.to_s }
  partitions.each do |tp|
    replicas = assignments[tp]
    if replicas.contains?(@broker_id)
      if brokers.size >= replicas.size
        brokers_diff = ScalaEnumerable.new(brokers.diff(replicas)).sort
        broker_index = replicas.index_of(@broker_id)
        new_broker = elect_new_broker(broker_index, brokers_diff)
        new_replicas = replicas.updated(broker_index, new_broker, CanBuildFrom)
        plan += Scala::Tuple.new(tp, new_replicas)
      else
        raise InsufficientBrokersRemainingError, %(#{brokers.size} remaining brokers, #{replicas.size} replicas needed)
      end
    end
  end
  plan
end