Class: Kazoo::CLI::Consumers

Inherits:
Thor
  • Object
show all
Includes:
Common
Defined in:
lib/kazoo/cli/consumers.rb

Instance Method Summary collapse

Methods included from Common

included

Instance Method Details

#delete(name) ⇒ Object

Raises:



72
73
74
75
76
77
78
79
80
# File 'lib/kazoo/cli/consumers.rb', line 72

def delete(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.destroy
end

#listObject



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/kazoo/cli/consumers.rb', line 7

def list
  validate_class_options!

  kafka_cluster.consumergroups.sort_by(&:name).each do |group|
    instances = group.instances
    if instances.length == 0
      puts "- #{group.name}: inactive"
    else
      puts "- #{group.name}: #{instances.length} running instances"
    end
  end
end

#reset(name) ⇒ Object

Raises:



83
84
85
86
87
88
89
90
91
# File 'lib/kazoo/cli/consumers.rb', line 83

def reset(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.reset_all_offsets
end

#show(name) ⇒ Object

Raises:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/kazoo/cli/consumers.rb', line 21

def show(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?

  topics = cg.topics.sort_by(&:name)

  puts "Consumer name: #{cg.name}"
  puts "Created on: #{cg.created_at}"
  puts "Topics (#{topics.length}): #{topics.map(&:name).join(', ')}"

  instances = cg.instances
  if instances.length > 0

    puts
    puts "Running instances (#{instances.length}):"
    instances.each do |instance|
      puts "- #{instance.id}\t(created on #{instance.created_at})"
    end

    partition_claims = cg.partition_claims
    if partition_claims.length > 0
      partitions = partition_claims.keys.sort_by { |p| [p.topic.name, p.id] }

      puts
      puts "Partition claims (#{partition_claims.length}):"
      partitions.each do |partition|
        instance = partition_claims[partition]
        puts "- #{partition.key}: #{instance.id}"
      end
    else
      puts
      puts "WARNING: this consumer group is active but hasn't claimed any partitions"
    end

    unclaimed_partitions = (cg.partitions - partition_claims.keys).sort_by { |p| [p.topic.name, p.id] }

    if unclaimed_partitions.length > 0
      puts
      puts "WARNING: this consumergroup has #{unclaimed_partitions.length} unclaimed partitions:"
      unclaimed_partitions.each do |partition|
        puts "- #{partition.key}"
      end
    end
  else
    puts "This consumer group is inactive."
  end
end