Class: Kazoo::Cluster
- Inherits:
-
Object
- Object
- Kazoo::Cluster
- Defined in:
- lib/kazoo/cluster.rb
Instance Attribute Summary collapse
-
#zookeeper ⇒ Object
readonly
Returns the value of attribute zookeeper.
Instance Method Summary collapse
- #brokers ⇒ Object
- #close ⇒ Object
- #consumergroups ⇒ Object
-
#initialize(zookeeper) ⇒ Cluster
constructor
A new instance of Cluster.
- #partitions ⇒ Object
- #reset_metadata ⇒ Object
- #topics ⇒ Object
- #under_replicated? ⇒ Boolean
- #zk ⇒ Object
Constructor Details
#initialize(zookeeper) ⇒ Cluster
Returns a new instance of Cluster.
6 7 8 9 |
# File 'lib/kazoo/cluster.rb', line 6 def initialize(zookeeper) @zookeeper = zookeeper @zk_mutex, @brokers_mutex, @topics_mutex = Mutex.new, Mutex.new, Mutex.new end |
Instance Attribute Details
#zookeeper ⇒ Object (readonly)
Returns the value of attribute zookeeper.
4 5 6 |
# File 'lib/kazoo/cluster.rb', line 4 def zookeeper @zookeeper end |
Instance Method Details
#brokers ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/kazoo/cluster.rb', line 17 def brokers @brokers_mutex.synchronize do @brokers ||= begin brokers = zk.get_children(path: "/brokers/ids") if brokers.fetch(:rc) != Zookeeper::Constants::ZOK raise NoClusterRegistered, "No Kafka cluster registered on this Zookeeper location." end result, threads, mutex = {}, ThreadGroup.new, Mutex.new brokers.fetch(:children).map do |id| t = Thread.new do broker_info = zk.get(path: "/brokers/ids/#{id}") raise Kazoo::Error, "Failed to retrieve broker info. Error code: #{broker_info.fetch(:rc)}" unless broker_info.fetch(:rc) == Zookeeper::Constants::ZOK broker = Kazoo::Broker.from_json(self, id, JSON.parse(broker_info.fetch(:data))) mutex.synchronize { result[id.to_i] = broker } end threads.add(t) end threads.list.each(&:join) result end end end |
#close ⇒ Object
85 86 87 |
# File 'lib/kazoo/cluster.rb', line 85 def close zk.close end |
#consumergroups ⇒ Object
43 44 45 46 47 48 |
# File 'lib/kazoo/cluster.rb', line 43 def consumergroups @consumergroups ||= begin consumers = zk.get_children(path: "/consumers") consumers.fetch(:children).map { |name| Kazoo::Consumergroup.new(self, name) } end end |
#partitions ⇒ Object
73 74 75 |
# File 'lib/kazoo/cluster.rb', line 73 def partitions topics.values.flat_map(&:partitions) end |
#reset_metadata ⇒ Object
77 78 79 |
# File 'lib/kazoo/cluster.rb', line 77 def @topics, @brokers = nil, nil end |
#topics ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/kazoo/cluster.rb', line 50 def topics @topics_mutex.synchronize do @topics ||= begin topics = zk.get_children(path: "/brokers/topics") raise Kazoo::Error, "Failed to list topics. Error code: #{topics.fetch(:rc)}" unless topics.fetch(:rc) == Zookeeper::Constants::ZOK result, threads, mutex = {}, ThreadGroup.new, Mutex.new topics.fetch(:children).each do |name| t = Thread.new do topic_info = zk.get(path: "/brokers/topics/#{name}") raise Kazoo::Error, "Failed to get topic info. Error code: #{topic_info.fetch(:rc)}" unless topic_info.fetch(:rc) == Zookeeper::Constants::ZOK topic = Kazoo::Topic.from_json(self, name, JSON.parse(topic_info.fetch(:data))) mutex.synchronize { result[name] = topic } end threads.add(t) end threads.list.each(&:join) result end end end |
#under_replicated? ⇒ Boolean
81 82 83 |
# File 'lib/kazoo/cluster.rb', line 81 def under_replicated? partitions.any?(&:under_replicated?) end |
#zk ⇒ Object
11 12 13 14 15 |
# File 'lib/kazoo/cluster.rb', line 11 def zk @zk_mutex.synchronize do @zk ||= Zookeeper.new(zookeeper) end end |