Class: Kazoo::CLI::Topics
- Inherits:
-
Thor
- Object
- Thor
- Kazoo::CLI::Topics
- Includes:
- Common
- Defined in:
- lib/kazoo/cli/topics.rb
Instance Method Summary collapse
- #create(name) ⇒ Object
- #delete(name) ⇒ Object
- #list ⇒ Object
- #partitions(topic) ⇒ Object
- #set_partitions(topic) ⇒ Object
Methods included from Common
Instance Method Details
#create(name) ⇒ Object
18 19 20 21 22 |
# File 'lib/kazoo/cli/topics.rb', line 18 def create(name) kafka_cluster.create_topic(name, partitions: [:partitions], replication_factor: [:replication_factor]) end |
#delete(name) ⇒ Object
25 26 27 28 29 |
# File 'lib/kazoo/cli/topics.rb', line 25 def delete(name) kafka_cluster.topics.fetch(name).destroy end |
#list ⇒ Object
7 8 9 10 11 12 13 |
# File 'lib/kazoo/cli/topics.rb', line 7 def list kafka_cluster.topics.values.sort_by(&:name).each do |topic| $stdout.puts topic.name end end |
#partitions(topic) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/kazoo/cli/topics.rb', line 32 def partitions(topic) topic = kafka_cluster.topics.fetch(topic) topic.partitions.each do |partition| puts "#{partition.key}\tReplicas: #{partition.replicas.map(&:id).join(",")}\tISR: #{partition.isr.map(&:id).join(",")}" end end |
#set_partitions(topic) ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/kazoo/cli/topics.rb', line 44 def set_partitions(topic) topic = kafka_cluster.topics.fetch(topic) new_partitions = [:partitions] - topic.partitions.length raise "You can only add partitions to a topic, not remove them" if new_partitions <= 0 replication_factor = [:replication_factor] || topic.replication_factor topic.add_partitions(partitions: new_partitions, replication_factor: replication_factor) end |