Class: Kazoo::Consumergroup
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup
- Defined in:
- lib/kazoo/consumergroup.rb
Defined Under Namespace
Classes: Instance
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #commit_offset(partition, offset) ⇒ Object
- #create ⇒ Object
- #created_at ⇒ Object
- #destroy ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #exists? ⇒ Boolean
- #hash ⇒ Object
-
#initialize(cluster, name) ⇒ Consumergroup
constructor
A new instance of Consumergroup.
- #inspect ⇒ Object
- #instances ⇒ Object
- #instantiate(id: nil) ⇒ Object
- #partition_claims ⇒ Object
- #partitions ⇒ Object
- #reset_all_offsets ⇒ Object
- #retrieve_all_offsets ⇒ Object
- #retrieve_offset(partition) ⇒ Object
- #topics ⇒ Object
- #unclaimed_partitions ⇒ Object
- #watch_instances(&block) ⇒ Object
- #watch_partition_claim(partition, &block) ⇒ Object
Constructor Details
#initialize(cluster, name) ⇒ Consumergroup
Returns a new instance of Consumergroup.
5 6 7 |
# File 'lib/kazoo/consumergroup.rb', line 5 def initialize(cluster, name) @cluster, @name = cluster, name end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def cluster @cluster end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def name @name end |
Instance Method Details
#active? ⇒ Boolean
35 36 37 |
# File 'lib/kazoo/consumergroup.rb', line 35 def active? instances.length > 0 end |
#commit_offset(partition, offset) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/kazoo/consumergroup.rb', line 189 def commit_offset(partition, offset) partition_offset_path = "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}" next_offset_data = (offset + 1).to_s result = cluster.zk.set(path: partition_offset_path, data: next_offset_data) if result.fetch(:rc) == Zookeeper::Constants::ZNONODE cluster.send(:recursive_create, path: File.dirname(partition_offset_path)) result = cluster.zk.create(path: partition_offset_path, data: next_offset_data) end if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#create ⇒ Object
9 10 11 12 |
# File 'lib/kazoo/consumergroup.rb', line 9 def create cluster.send(:recursive_create, path: "/consumers/#{name}/ids") cluster.send(:recursive_create, path: "/consumers/#{name}/owners") end |
#created_at ⇒ Object
23 24 25 26 27 28 |
# File 'lib/kazoo/consumergroup.rb', line 23 def created_at result = cluster.zk.stat(path: "/consumers/#{name}") raise Kazoo::Error, "Failed to get consumer details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end |
#destroy ⇒ Object
14 15 16 |
# File 'lib/kazoo/consumergroup.rb', line 14 def destroy cluster.send(:recursive_delete, path: "/consumers/#{name}") end |
#eql?(other) ⇒ Boolean Also known as: ==
212 213 214 |
# File 'lib/kazoo/consumergroup.rb', line 212 def eql?(other) other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name end |
#exists? ⇒ Boolean
18 19 20 21 |
# File 'lib/kazoo/consumergroup.rb', line 18 def exists? stat = cluster.zk.stat(path: "/consumers/#{name}") stat.fetch(:stat).exists? end |
#hash ⇒ Object
218 219 220 |
# File 'lib/kazoo/consumergroup.rb', line 218 def hash [cluster, name].hash end |
#inspect ⇒ Object
208 209 210 |
# File 'lib/kazoo/consumergroup.rb', line 208 def inspect "#<Kazoo::Consumergroup name=#{name}>" end |
#instances ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/kazoo/consumergroup.rb', line 39 def instances result = cluster.zk.get_children(path: "/consumers/#{name}/ids") case result.fetch(:rc) when Zookeeper::Constants::ZOK result.fetch(:children).map { |id| Instance.new(self, id: id) } when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}" end end |
#instantiate(id: nil) ⇒ Object
31 32 33 |
# File 'lib/kazoo/consumergroup.rb', line 31 def instantiate(id: nil) Instance.new(self, id: id) end |
#partition_claims ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/kazoo/consumergroup.rb', line 108 def partition_claims topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to get partition claims. Result code: #{topic_result.fetch(:rc)}" end partition_claims, threads, mutex = {}, [], Mutex.new topic_result.fetch(:children).each do |topic_name| threads << Thread.new do topic = cluster.topic(topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/owners/#{topic.name}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = [] partition_result.fetch(:children).each do |partition_id| partition_threads << Thread.new do partition = topic.partition(partition_id.to_i) claim_result =cluster.zk.get(path: "/consumers/#{name}/owners/#{topic.name}/#{partition.id}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{claim_result.fetch(:rc)}" if claim_result.fetch(:rc) != Zookeeper::Constants::ZOK mutex.synchronize { partition_claims[partition] = instantiate(id: claim_result.fetch(:data)) } end end partition_threads.each(&:join) end end threads.each(&:join) return partition_claims end |
#partitions ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/kazoo/consumergroup.rb', line 91 def partitions partitions, threads, mutex = [], [], Mutex.new topics.each do |topic| threads << Thread.new do topic_partitions = topic.partitions mutex.synchronize { partitions.concat(topic_partitions) } end end threads.each(&:join) return partitions end |
#reset_all_offsets ⇒ Object
204 205 206 |
# File 'lib/kazoo/consumergroup.rb', line 204 def reset_all_offsets cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets") end |
#retrieve_all_offsets ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/kazoo/consumergroup.rb', line 152 def retrieve_all_offsets topic_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{topic_result.fetch(:rc)}" end offsets, threads, mutex = {}, [], Mutex.new topic_result.fetch(:children).each do |topic_name| threads << Thread.new do Thread.abort_on_exception = true topic = Kazoo::Topic.new(cluster, topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic.name}") raise Kazoo::Error, "Failed to retrieve offsets. Error code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = [] partition_result.fetch(:children).each do |partition_id| partition_threads << Thread.new do Thread.abort_on_exception = true partition = topic.partition(partition_id.to_i) offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}") raise Kazoo::Error, "Failed to retrieve offsets. Error code: #{offset_result.fetch(:rc)}" if offset_result.fetch(:rc) != Zookeeper::Constants::ZOK mutex.synchronize { offsets[partition] = offset_result.fetch(:data).to_i } end end partition_threads.each(&:join) end end threads.each(&:join) return offsets end |
#retrieve_offset(partition) ⇒ Object
143 144 145 146 147 148 149 150 |
# File 'lib/kazoo/consumergroup.rb', line 143 def retrieve_offset(partition) result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}") case result.fetch(:rc) when Zookeeper::Constants::ZOK; result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE; nil else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#topics ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/kazoo/consumergroup.rb', line 79 def topics topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK topic_result.fetch(:children).map { |topic_name| cluster.topic(topic_name) } when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed to get subscribed topics. Result code: #{topic_result.fetch(:rc)}" end end |
#unclaimed_partitions ⇒ Object
104 105 106 |
# File 'lib/kazoo/consumergroup.rb', line 104 def unclaimed_partitions partitions - partition_claims.keys end |
#watch_instances(&block) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/kazoo/consumergroup.rb', line 51 def watch_instances(&block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to watch instances. Error code: #{result.fetch(:rc)}" end instances = result.fetch(:children).map { |id| Instance.new(self, id: id) } [instances, cb] end |
#watch_partition_claim(partition, &block) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/kazoo/consumergroup.rb', line 64 def watch_partition_claim(partition, &block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb) case result.fetch(:rc) when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet [nil, nil] when Zookeeper::Constants::ZOK [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb] else raise Kazoo::Error, "Failed to set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |