Class: Kafkat::Command::VerifyReplicas

Inherits:
Base
  • Object
show all
Defined in:
lib/kafkat/command/verify-replicas.rb

Instance Attribute Summary

Attributes inherited from Base

#config

Instance Method Summary collapse

Methods inherited from Base

#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper

Methods included from Logging

#print_err

Methods included from Kafkat::CommandIO

#prompt_and_execute_assignments

Methods included from Formatting

#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header, #print_topic_name

Constructor Details

This class inherits a constructor from Kafkat::Command::Base

Instance Method Details



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/kafkat/command/verify-replicas.rb', line 58

def print_mismatched_partitions(partition_replica_size, partition_replica_size_stat, print_details, print_summary)
  topic_column_width = partition_replica_size.keys.max_by(&:length).length
  if print_details
    printf "%-#{topic_column_width}s %-10s %-15s %-20s\n", "topic", "partition", "replica_size", "replication_factor"

    partition_replica_size.each do |topic_name, partition|
      replication_factor = partition_replica_size_stat[topic_name].key(partition_replica_size_stat[topic_name].values.max)

      partition.each do |id, replica_size|
        if replica_size != replication_factor
          printf "%-#{topic_column_width}s %-10d %-15d %-20d\n", topic_name, id, replica_size, replication_factor
        end
      end
    end
  end

  if print_summary
    printf "%-#{topic_column_width}s %-15s %-10s %-15s %-20s\n", "topic", "replica_size", "count", "percentage", "replication_factor"
    partition_replica_size_stat.each do |topic_name, partition|
      if partition.values.size > 1
        replication_factor = partition_replica_size_stat[topic_name].key(partition_replica_size_stat[topic_name].values.max)
        num_partitions = 0.0
        partition.each { |key, value| num_partitions += value }

        partition.each do |replica_size, cnt|
          printf "%-#{topic_column_width}s %-15d %-10d %-15d %-20d\n", topic_name, replica_size, cnt, (cnt * 100 /num_partitions)
                                                                                     .to_i, replication_factor
        end
      end
    end
  end
end

#runObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/kafkat/command/verify-replicas.rb', line 9

def run
  opts = Trollop.options do
    opt :topics, "topic names", type: :string
    opt :broker, "broker ID", type: :string
    opt :print_details, "show replica size of mismatched partitions", :default => false
    opt :print_summary, "show summary of mismatched partitions", :default => false
  end

  topic_names = opts[:topics]
  print_details = opts[:print_details]
  print_summary = opts[:print_summary]

  if topic_names
    topics_list = topic_names.split(',')
    topics = zookeeper.get_topics(topics_list)
  end
  topics ||= zookeeper.get_topics
  broker = opts[:broker] && opts[:broker].to_i

  partition_replica_size, partition_replica_size_stat = verify_replicas(broker, topics)

  print_summary = !print_details || print_summary
  print_mismatched_partitions(partition_replica_size, partition_replica_size_stat, print_details, print_summary)
end

#verify_replicas(broker, topics) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/kafkat/command/verify-replicas.rb', line 34

def verify_replicas(broker, topics)
  partition_replica_size = {}
  partition_replica_size_stat = {}

  topics.each do |_, t|
    partition_replica_size[t.name] = {}
    partition_replica_size_stat[t.name] = {}

    t.partitions.each do |p|
      replica_size = p.replicas.length

      next if broker && !p.replicas.include?(broker)

      partition_replica_size_stat[t.name][replica_size] ||= 0
      partition_replica_size_stat[t.name][replica_size] += 1

      partition_replica_size[t.name][p.id] = replica_size
    end

  end

  return partition_replica_size, partition_replica_size_stat
end