Class: Ktl::Cluster
Instance Method Summary collapse
- #decommission_broker(broker_id) ⇒ Object
- #migrate_broker ⇒ Object
- #preferred_replica(regexp = '.*') ⇒ Object
- #reassignment_progress ⇒ Object
- #shuffle(regexp = '.*') ⇒ Object
- #stats ⇒ Object
Instance Method Details
#decommission_broker(broker_id) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/ktl/cluster.rb', line 80 def decommission_broker(broker_id) with_zk_client do |zk_client| if .rendezvous? plan = RendezvousShufflePlan.new(zk_client, blacklist: [broker_id.to_i]) else plan = DecommissionPlan.new(zk_client, broker_id.to_i) end reassigner = Reassigner.new(zk_client, limit: .limit, logger: logger) execute_reassignment(reassigner, plan) end end |
#migrate_broker ⇒ Object
35 36 37 38 39 40 41 42 |
# File 'lib/ktl/cluster.rb', line 35 def migrate_broker with_zk_client do |zk_client| old_leader, new_leader = .values_at(:from, :to) plan = MigrationPlan.new(zk_client, old_leader, new_leader) reassigner = Reassigner.new(zk_client, limit: .limit, logger: logger) execute_reassignment(reassigner, plan) end end |
#preferred_replica(regexp = '.*') ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/ktl/cluster.rb', line 16 def preferred_replica(regexp='.*') with_zk_client do |zk_client| regexp = Regexp.new(regexp) partitions = zk_client.all_partitions partitions = partitions.filter { |tp| !!tp.topic.match(regexp) }.to_set if partitions.size > 0 logger.info 'performing preferred replica leader election on %d partitions' % partitions.size Kafka::Admin.preferred_replica(zk_client.raw_client, partitions) else logger.info 'no topics matched %s' % regexp.inspect end end end |
#reassignment_progress ⇒ Object
95 96 97 98 99 100 |
# File 'lib/ktl/cluster.rb', line 95 def reassignment_progress with_zk_client do |zk_client| progress = ReassignmentProgress.new(zk_client, .merge(logger: logger)) progress.display(shell) end end |
#shuffle(regexp = '.*') ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/ktl/cluster.rb', line 54 def shuffle(regexp='.*') with_zk_client do |zk_client| plan_factory = if .rack_aware RackAwareShufflePlan elsif .rendezvous RendezvousShufflePlan else ShufflePlan end plan = plan_factory.new(zk_client, { filter: Regexp.new(regexp), brokers: .brokers, blacklist: .blacklist, replication_factor: .replication_factor, logger: logger, log_plan: .dryrun, }) reassigner = Reassigner.new(zk_client, limit: .limit, logger: logger, log_assignments: .verbose) execute_reassignment(reassigner, plan, .dryrun) end end |
#stats ⇒ Object
7 8 9 10 11 12 |
# File 'lib/ktl/cluster.rb', line 7 def stats with_zk_client do |zk_client| task = ClusterStatsTask.new(zk_client, shell) task.execute end end |