Class: Kafkat::Command::Reassign

Inherits:
Base
  • Object
show all
Defined in:
lib/kafkat/command/reassign.rb

Instance Attribute Summary

Attributes inherited from Base

#config

Instance Method Summary collapse

Methods inherited from Base

#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper

Methods included from Kafkat::CommandIO

#prompt_and_execute_assignments

Methods included from Formatting

#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header

Constructor Details

This class inherits a constructor from Kafkat::Command::Base

Instance Method Details

#runObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/kafkat/command/reassign.rb', line 9

def run
  topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--')

  all_brokers = zookeeper.get_brokers
  topics = topic_name && zookeeper.get_topics([topic_name])
  topics ||= zookeeper.get_topics

  opts = Trollop.options do
    opt :brokers, "replica set (broker IDs)", type: :string
    opt :replicas, "number of replicas (count)", type: :integer
  end

  broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i)
  replica_count = opts[:replicas]

  broker_ids ||= zookeeper.get_brokers.values.map(&:id)

  all_brokers_id = all_brokers.values.map(&:id)
  broker_ids.each do |id|
    if !all_brokers_id.include?(id)
      print "ERROR: Broker #{id} is not currently active.\n"
      exit 1
    end
  end

  # *** This logic is duplicated from Kakfa 0.8.1.1 ***

  assignments = []
  broker_count = broker_ids.size

  topics.each do |_, t|
    # This is how Kafka's AdminUtils determines these values.
    partition_count = t.partitions.size
    topic_replica_count = replica_count || t.partitions[0].replicas.size

    if topic_replica_count > broker_count
      print "ERROR: Replication factor (#{topic_replica_count}) is larger than brokers (#{broker_count}).\n"
      exit 1
    end

    start_index = Random.rand(broker_count)
    replica_shift = Random.rand(broker_count)

    t.partitions.each do |p|
      replica_shift += 1 if p.id > 0 && p.id % broker_count == 0
      first_replica_index = (p.id + start_index) % broker_count

      replicas = [broker_ids[first_replica_index]]

      (0...topic_replica_count-1).each do |i|
        shift = 1 + (replica_shift + i) % (broker_count - 1)
        index = (first_replica_index + shift) % broker_count
        replicas << broker_ids[index]
      end

      replicas.reverse!
      assignments << Assignment.new(t.name, p.id, replicas)
    end
  end

  # ****************

  prompt_and_execute_assignments(assignments)
end