21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/kazoo/cli/consumers.rb', line 21
def show(name)
validate_class_options!
cg = kafka_cluster.consumergroup(name)
raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
topics = cg.topics.sort_by(&:name)
puts "Consumer name: #{cg.name}"
puts "Created on: #{cg.created_at}"
puts "Topics (#{topics.length}): #{topics.map(&:name).join(', ')}"
instances = cg.instances
if instances.length > 0
puts
puts "Running instances (#{instances.length}):"
instances.each do |instance|
puts "- #{instance.id}\t(created on #{instance.created_at})"
end
partition_claims = cg.partition_claims
if partition_claims.length > 0
partitions = partition_claims.keys.sort_by { |p| [p.topic.name, p.id] }
puts
puts "Partition claims (#{partition_claims.length}):"
partitions.each do |partition|
instance = partition_claims[partition]
puts "- #{partition.key}: #{instance.id}"
end
else
puts
puts "WARNING: this consumer group is active but hasn't claimed any partitions"
end
unclaimed_partitions = (cg.partitions - partition_claims.keys).sort_by { |p| [p.topic.name, p.id] }
if unclaimed_partitions.length > 0
puts
puts "WARNING: this consumergroup has #{unclaimed_partitions.length} unclaimed partitions:"
unclaimed_partitions.each do |partition|
puts "- #{partition.key}"
end
end
else
puts "This consumer group is inactive."
end
end
|