Class: Kazoo::Cluster
- Inherits:
-
Object
- Object
- Kazoo::Cluster
- Defined in:
- lib/kazoo/cluster.rb
Overview
Kazoo::Cluster represents a full Kafka cluster, based on how it is registered in Zookeeper. It allows you the inspect the brokers of the cluster, the topics and partition metadata, and the consumergroups that are registered against the cluster.
Instance Attribute Summary collapse
-
#zookeeper ⇒ Object
readonly
Returns the value of attribute zookeeper.
Instance Method Summary collapse
-
#brokers ⇒ Object
Returns a hash of all the brokers in the.
-
#close ⇒ Object
Closes the zookeeper connection and clears all the local caches.
-
#consumergroup(name) ⇒ Object
Returns a Kazoo::Consumergroup instance for a given consumer name.
-
#consumergroups ⇒ Object
Returns a list of consumer groups that are registered against the Kafka cluster.
-
#create_topic(name, partitions: nil, replication_factor: nil, config: nil) ⇒ Object
Creates a topic on the Kafka cluster, with the provided number of partitions and replication factor.
-
#initialize(zookeeper) ⇒ Cluster
constructor
A new instance of Cluster.
-
#partitions ⇒ Object
Returns a list of all partitions hosted by the cluster.
-
#preferred_leader_election(partitions: nil) ⇒ Object
Triggers a preferred leader elections for the provided list of partitions.
-
#reset_metadata ⇒ Object
Resets the locally cached list of brokers and topics, which will mean they will be fetched freshly from Zookeeper the next time they are requested.
-
#topic(name) ⇒ Object
Returns a Kazoo::Topic for a given topic name.
-
#topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) ⇒ Object
Returns a hash of all the topics in the Kafka cluster, indexed by the topic name.
-
#under_replicated? ⇒ Boolean
Returns true if any of the partitions hosted by the cluster.
-
#zk ⇒ Object
Returns a zookeeper connection.
Constructor Details
#initialize(zookeeper) ⇒ Cluster
Returns a new instance of Cluster.
9 10 11 12 |
# File 'lib/kazoo/cluster.rb', line 9 def initialize(zookeeper) @zookeeper = zookeeper @zk_mutex, @brokers_mutex, @topics_mutex = Mutex.new, Mutex.new, Mutex.new end |
Instance Attribute Details
#zookeeper ⇒ Object (readonly)
Returns the value of attribute zookeeper.
7 8 9 |
# File 'lib/kazoo/cluster.rb', line 7 def zookeeper @zookeeper end |
Instance Method Details
#brokers ⇒ Object
Returns a hash of all the brokers in the
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/kazoo/cluster.rb', line 22 def brokers @brokers_mutex.synchronize do @brokers ||= begin brokers = zk.get_children(path: "/brokers/ids") if brokers.fetch(:rc) != Zookeeper::Constants::ZOK raise NoClusterRegistered, "No Kafka cluster registered on this Zookeeper location." end result, mutex = {}, Mutex.new threads = brokers.fetch(:children).map do |id| Thread.new do Thread.abort_on_exception = true broker_info = zk.get(path: "/brokers/ids/#{id}") raise Kazoo::Error, "Failed to retrieve broker info. Error code: #{broker_info.fetch(:rc)}" unless broker_info.fetch(:rc) == Zookeeper::Constants::ZOK broker = Kazoo::Broker.from_json(self, id, JSON.parse(broker_info.fetch(:data))) mutex.synchronize { result[id.to_i] = broker } end end threads.each(&:join) result end end end |
#close ⇒ Object
Closes the zookeeper connection and clears all the local caches.
121 122 123 124 125 |
# File 'lib/kazoo/cluster.rb', line 121 def close zk.close @zk = nil end |
#consumergroup(name) ⇒ Object
Returns a Kazoo::Consumergroup instance for a given consumer name.
Note that this doesn’t register a new consumer group in Zookeeper; you wil have to call Kazoo::Consumergroup.create to do that.
60 61 62 |
# File 'lib/kazoo/cluster.rb', line 60 def consumergroup(name) Kazoo::Consumergroup.new(self, name) end |
#consumergroups ⇒ Object
Returns a list of consumer groups that are registered against the Kafka cluster.
49 50 51 52 53 54 |
# File 'lib/kazoo/cluster.rb', line 49 def consumergroups @consumergroups ||= begin consumers = zk.get_children(path: "/consumers") consumers.fetch(:children).map { |name| Kazoo::Consumergroup.new(self, name) } end end |
#create_topic(name, partitions: nil, replication_factor: nil, config: nil) ⇒ Object
Creates a topic on the Kafka cluster, with the provided number of partitions and replication factor.
82 83 84 85 86 87 |
# File 'lib/kazoo/cluster.rb', line 82 def create_topic(name, partitions: nil, replication_factor: nil, config: nil) raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0 raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0 Kazoo::Topic.create(self, name, partitions: Integer(partitions), replication_factor: Integer(replication_factor), config: config) end |
#partitions ⇒ Object
Returns a list of all partitions hosted by the cluster
90 91 92 |
# File 'lib/kazoo/cluster.rb', line 90 def partitions topics.values.flat_map(&:partitions) end |
#preferred_leader_election(partitions: nil) ⇒ Object
Triggers a preferred leader elections for the provided list of partitions. If no list of partitions is provided, the preferred leader will be elected for all partitions in the cluster.
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/kazoo/cluster.rb', line 107 def preferred_leader_election(partitions: nil) partitions = self.partitions if partitions.nil? result = zk.create(path: "/admin/preferred_replica_election", data: JSON.generate(version: 1, partitions: partitions)) case result.fetch(:rc) when Zookeeper::Constants::ZOK return true when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::Error, "Another preferred leader election is still in progress" else raise Kazoo::Error, "Failed to start preferred leadership election. Result code: #{result.fetch(:rc)}" end end |
#reset_metadata ⇒ Object
Resets the locally cached list of brokers and topics, which will mean they will be fetched freshly from Zookeeper the next time they are requested.
96 97 98 |
# File 'lib/kazoo/cluster.rb', line 96 def @topics, @brokers, @consumergroups = nil, nil, nil end |
#topic(name) ⇒ Object
Returns a Kazoo::Topic for a given topic name.
76 77 78 |
# File 'lib/kazoo/cluster.rb', line 76 def topic(name) Kazoo::Topic.new(self, name) end |
#topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) ⇒ Object
Returns a hash of all the topics in the Kafka cluster, indexed by the topic name.
65 66 67 68 69 70 71 72 73 |
# File 'lib/kazoo/cluster.rb', line 65 def topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) @topics_mutex.synchronize do @topics ||= begin topics = zk.get_children(path: "/brokers/topics") raise Kazoo::Error, "Failed to list topics. Error code: #{topics.fetch(:rc)}" unless topics.fetch(:rc) == Zookeeper::Constants::ZOK preload_topics_from_names(topics.fetch(:children), preload: preload) end end end |
#under_replicated? ⇒ Boolean
Returns true if any of the partitions hosted by the cluster
101 102 103 |
# File 'lib/kazoo/cluster.rb', line 101 def under_replicated? partitions.any?(&:under_replicated?) end |
#zk ⇒ Object
Returns a zookeeper connection
15 16 17 18 19 |
# File 'lib/kazoo/cluster.rb', line 15 def zk @zk_mutex.synchronize do @zk ||= Zookeeper.new(zookeeper) end end |