Class: Ktl::Cluster

Inherits:
Command show all
Defined in:
lib/ktl/cluster.rb

Instance Method Summary collapse

Instance Method Details

#decommission_broker(broker_id) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ktl/cluster.rb', line 80

def decommission_broker(broker_id)
  with_zk_client do |zk_client|
    if options.rendezvous?
      plan = RendezvousShufflePlan.new(zk_client, blacklist: [broker_id.to_i])
    else
      plan = DecommissionPlan.new(zk_client, broker_id.to_i)
    end
    reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
    execute_reassignment(reassigner, plan)
  end
end

#migrate_brokerObject



35
36
37
38
39
40
41
42
# File 'lib/ktl/cluster.rb', line 35

def migrate_broker
  with_zk_client do |zk_client|
    old_leader, new_leader = options.values_at(:from, :to)
    plan = MigrationPlan.new(zk_client, old_leader, new_leader)
    reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger)
    execute_reassignment(reassigner, plan)
  end
end

#preferred_replica(regexp = '.*') ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/ktl/cluster.rb', line 16

def preferred_replica(regexp='.*')
  with_zk_client do |zk_client|
    regexp = Regexp.new(regexp)
    partitions = zk_client.all_partitions
    partitions = partitions.filter { |tp| !!tp.topic.match(regexp) }.to_set
    if partitions.size > 0
      logger.info 'performing preferred replica leader election on %d partitions' % partitions.size
      Kafka::Admin.preferred_replica(zk_client.raw_client, partitions)
    else
      logger.info 'no topics matched %s' % regexp.inspect
    end
  end
end

#reassignment_progressObject



95
96
97
98
99
100
# File 'lib/ktl/cluster.rb', line 95

def reassignment_progress
  with_zk_client do |zk_client|
    progress = ReassignmentProgress.new(zk_client, options.merge(logger: logger))
    progress.display(shell)
  end
end

#shuffle(regexp = '.*') ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/ktl/cluster.rb', line 54

def shuffle(regexp='.*')
  with_zk_client do |zk_client|
    plan_factory = if options.rack_aware
      RackAwareShufflePlan
    elsif options.rendezvous
      RendezvousShufflePlan
    else
      ShufflePlan
    end
    plan = plan_factory.new(zk_client, {
      filter: Regexp.new(regexp),
      brokers: options.brokers,
      blacklist: options.blacklist,
      replication_factor: options.replication_factor,
      logger: logger,
      log_plan: options.dryrun,
    })
    reassigner = Reassigner.new(zk_client, limit: options.limit, logger: logger, log_assignments: options.verbose)
    execute_reassignment(reassigner, plan, options.dryrun)
  end
end

#statsObject



7
8
9
10
11
12
# File 'lib/ktl/cluster.rb', line 7

def stats
  with_zk_client do |zk_client|
    task = ClusterStatsTask.new(zk_client, shell)
    task.execute
  end
end