Class: Kazoo::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/cluster.rb

Overview

Kazoo::Cluster represents a full Kafka cluster, based on how it is registered in Zookeeper. It allows you the inspect the brokers of the cluster, the topics and partition metadata, and the consumergroups that are registered against the cluster.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zookeeper) ⇒ Cluster

Returns a new instance of Cluster.



9
10
11
12
# File 'lib/kazoo/cluster.rb', line 9

def initialize(zookeeper)
  @zookeeper = zookeeper
  @zk_mutex, @brokers_mutex, @topics_mutex = Mutex.new, Mutex.new, Mutex.new
end

Instance Attribute Details

#zookeeperObject (readonly)

Returns the value of attribute zookeeper.



7
8
9
# File 'lib/kazoo/cluster.rb', line 7

def zookeeper
  @zookeeper
end

Instance Method Details

#brokersObject

Returns a hash of all the brokers in the



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kazoo/cluster.rb', line 22

def brokers
  @brokers_mutex.synchronize do
    @brokers ||= begin
      brokers = zk.get_children(path: "/brokers/ids")

      if brokers.fetch(:rc) != Zookeeper::Constants::ZOK
        raise NoClusterRegistered, "No Kafka cluster registered on this Zookeeper location."
      end

      result, mutex = {}, Mutex.new
      threads = brokers.fetch(:children).map do |id|
        Thread.new do
          Thread.abort_on_exception = true
          broker_info = zk.get(path: "/brokers/ids/#{id}")
          raise Kazoo::Error, "Failed to retrieve broker info. Error code: #{broker_info.fetch(:rc)}" unless broker_info.fetch(:rc) == Zookeeper::Constants::ZOK

          broker = Kazoo::Broker.from_json(self, id, JSON.parse(broker_info.fetch(:data)))
          mutex.synchronize { result[id.to_i] = broker }
        end
      end
      threads.each(&:join)
      result
    end
  end
end

#closeObject

Closes the zookeeper connection and clears all the local caches.



121
122
123
124
125
# File 'lib/kazoo/cluster.rb', line 121

def close
  zk.close
  @zk = nil
  
end

#consumergroup(name) ⇒ Object

Returns a Kazoo::Consumergroup instance for a given consumer name.

Note that this doesn’t register a new consumer group in Zookeeper; you wil have to call Kazoo::Consumergroup.create to do that.



60
61
62
# File 'lib/kazoo/cluster.rb', line 60

def consumergroup(name)
  Kazoo::Consumergroup.new(self, name)
end

#consumergroupsObject

Returns a list of consumer groups that are registered against the Kafka cluster.



49
50
51
52
53
54
# File 'lib/kazoo/cluster.rb', line 49

def consumergroups
  @consumergroups ||= begin
    consumers = zk.get_children(path: "/consumers")
    consumers.fetch(:children).map { |name| Kazoo::Consumergroup.new(self, name) }
  end
end

#create_topic(name, partitions: nil, replication_factor: nil, config: nil) ⇒ Object

Creates a topic on the Kafka cluster, with the provided number of partitions and replication factor.

Raises:

  • (ArgumentError)


82
83
84
85
86
87
# File 'lib/kazoo/cluster.rb', line 82

def create_topic(name, partitions: nil, replication_factor: nil, config: nil)
  raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0
  raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0

  Kazoo::Topic.create(self, name, partitions: Integer(partitions), replication_factor: Integer(replication_factor), config: config)
end

#partitionsObject

Returns a list of all partitions hosted by the cluster



90
91
92
# File 'lib/kazoo/cluster.rb', line 90

def partitions
  topics.values.flat_map(&:partitions)
end

#preferred_leader_election(partitions: nil) ⇒ Object

Triggers a preferred leader elections for the provided list of partitions. If no list of partitions is provided, the preferred leader will be elected for all partitions in the cluster.



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kazoo/cluster.rb', line 107

def preferred_leader_election(partitions: nil)
  partitions = self.partitions if partitions.nil?
  result = zk.create(path: "/admin/preferred_replica_election", data: JSON.generate(version: 1, partitions: partitions))
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "Another preferred leader election is still in progress"
  else
    raise Kazoo::Error, "Failed to start preferred leadership election. Result code: #{result.fetch(:rc)}"
  end
end

#reset_metadataObject

Resets the locally cached list of brokers and topics, which will mean they will be fetched freshly from Zookeeper the next time they are requested.



96
97
98
# File 'lib/kazoo/cluster.rb', line 96

def 
  @topics, @brokers, @consumergroups = nil, nil, nil
end

#topic(name) ⇒ Object

Returns a Kazoo::Topic for a given topic name.



76
77
78
# File 'lib/kazoo/cluster.rb', line 76

def topic(name)
  Kazoo::Topic.new(self, name)
end

#topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS) ⇒ Object

Returns a hash of all the topics in the Kafka cluster, indexed by the topic name.



65
66
67
68
69
70
71
72
73
# File 'lib/kazoo/cluster.rb', line 65

def topics(preload: Kazoo::Topic::DEFAULT_PRELOAD_METHODS)
  @topics_mutex.synchronize do
    @topics ||= begin
      topics = zk.get_children(path: "/brokers/topics")
      raise Kazoo::Error, "Failed to list topics. Error code: #{topics.fetch(:rc)}" unless topics.fetch(:rc) == Zookeeper::Constants::ZOK
      preload_topics_from_names(topics.fetch(:children), preload: preload)
    end
  end
end

#under_replicated?Boolean

Returns true if any of the partitions hosted by the cluster

Returns:

  • (Boolean)


101
102
103
# File 'lib/kazoo/cluster.rb', line 101

def under_replicated?
  partitions.any?(&:under_replicated?)
end

#zkObject

Returns a zookeeper connection



15
16
17
18
19
# File 'lib/kazoo/cluster.rb', line 15

def zk
  @zk_mutex.synchronize do
    @zk ||= Zookeeper.new(zookeeper)
  end
end