Class: Kazoo::Consumergroup
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup
- Defined in:
- lib/kazoo/consumergroup.rb
Defined Under Namespace
Classes: Instance
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #claimed_topics ⇒ Object
- #clean_stored_offsets(subscription = nil) ⇒ Object
- #clean_topic_claims(subscription = nil) ⇒ Object
- #commit_offset(partition, offset) ⇒ Object
- #create ⇒ Object
- #created_at ⇒ Object
- #destroy ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #exists? ⇒ Boolean
- #hash ⇒ Object
-
#initialize(cluster, name) ⇒ Consumergroup
constructor
A new instance of Consumergroup.
- #inspect ⇒ Object
- #instances ⇒ Object
- #instantiate(id: nil, subscription: nil) ⇒ Object
- #partition_claims ⇒ Object
- #partitions ⇒ Object
- #reset_all_offsets ⇒ Object
- #retrieve_all_offsets ⇒ Object
- #retrieve_offset(partition) ⇒ Object
- #retrieve_offsets(subscription = self.subscription) ⇒ Object
- #subscribed_topics ⇒ Object (also: #topics)
- #subscription ⇒ Object
- #unclaimed_partitions ⇒ Object
- #watch_instances(&block) ⇒ Object
- #watch_partition_claim(partition, &block) ⇒ Object
Constructor Details
#initialize(cluster, name) ⇒ Consumergroup
Returns a new instance of Consumergroup.
5 6 7 |
# File 'lib/kazoo/consumergroup.rb', line 5 def initialize(cluster, name) @cluster, @name = cluster, name end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def cluster @cluster end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/kazoo/consumergroup.rb', line 3 def name @name end |
Instance Method Details
#active? ⇒ Boolean
46 47 48 |
# File 'lib/kazoo/consumergroup.rb', line 46 def active? instances.length > 0 end |
#claimed_topics ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/kazoo/consumergroup.rb', line 92 def claimed_topics topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK topic_result.fetch(:children).map { |topic_name| cluster.topic(topic_name) } when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed to get subscribed topics. Result code: #{topic_result.fetch(:rc)}" end end |
#clean_stored_offsets(subscription = nil) ⇒ Object
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/kazoo/consumergroup.rb', line 279 def clean_stored_offsets(subscription = nil) subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription) topics_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") raise Kazoo::Error, "Failed to retrieve list of topics. Error code: #{topics_result.fetch(:rc)}" if topics_result.fetch(:rc) != Zookeeper::Constants::ZOK threads = topics_result.fetch(:children).map do |topic_name| Thread.new do Thread.abort_on_exception = true topic = cluster.topic(topic_name) unless subscription.topics(cluster).include?(topic) cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets/#{topic.name}") end end end threads.each(&:join) end |
#clean_topic_claims(subscription = nil) ⇒ Object
264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/kazoo/consumergroup.rb', line 264 def clean_topic_claims(subscription = nil) subscription = subscription.nil? ? self.subscription : Kazoo::Subscription.build(subscription) threads = claimed_topics.map do |topic| Thread.new do Thread.abort_on_exception = true unless subscription.topics(cluster).include?(topic) cluster.send(:recursive_delete, path: "/consumers/#{name}/owners/#{topic.name}") end end end threads.each(&:join) end |
#commit_offset(partition, offset) ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/kazoo/consumergroup.rb', line 245 def commit_offset(partition, offset) partition_offset_path = "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}" next_offset_data = (offset + 1).to_s result = cluster.zk.set(path: partition_offset_path, data: next_offset_data) if result.fetch(:rc) == Zookeeper::Constants::ZNONODE cluster.send(:recursive_create, path: partition_offset_path) result = cluster.zk.set(path: partition_offset_path, data: next_offset_data) end if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#create ⇒ Object
9 10 11 12 13 |
# File 'lib/kazoo/consumergroup.rb', line 9 def create cluster.send(:recursive_create, path: "/consumers/#{name}/ids") cluster.send(:recursive_create, path: "/consumers/#{name}/owners") cluster. end |
#created_at ⇒ Object
25 26 27 28 29 30 |
# File 'lib/kazoo/consumergroup.rb', line 25 def created_at result = cluster.zk.stat(path: "/consumers/#{name}") raise Kazoo::Error, "Failed to get consumer details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end |
#destroy ⇒ Object
15 16 17 18 |
# File 'lib/kazoo/consumergroup.rb', line 15 def destroy cluster.send(:recursive_delete, path: "/consumers/#{name}") cluster. end |
#eql?(other) ⇒ Boolean Also known as: ==
302 303 304 |
# File 'lib/kazoo/consumergroup.rb', line 302 def eql?(other) other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name end |
#exists? ⇒ Boolean
20 21 22 23 |
# File 'lib/kazoo/consumergroup.rb', line 20 def exists? stat = cluster.zk.stat(path: "/consumers/#{name}") stat.fetch(:stat).exists? end |
#hash ⇒ Object
308 309 310 |
# File 'lib/kazoo/consumergroup.rb', line 308 def hash [cluster, name].hash end |
#inspect ⇒ Object
298 299 300 |
# File 'lib/kazoo/consumergroup.rb', line 298 def inspect "#<Kazoo::Consumergroup name=#{name}>" end |
#instances ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kazoo/consumergroup.rb', line 50 def instances result = cluster.zk.get_children(path: "/consumers/#{name}/ids") case result.fetch(:rc) when Zookeeper::Constants::ZOK instances_with_subscription(result.fetch(:children)) when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}" end end |
#instantiate(id: nil, subscription: nil) ⇒ Object
32 33 34 |
# File 'lib/kazoo/consumergroup.rb', line 32 def instantiate(id: nil, subscription: nil) Instance.new(self, id: id, subscription: subscription) end |
#partition_claims ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/kazoo/consumergroup.rb', line 127 def partition_claims topic_result = cluster.zk.get_children(path: "/consumers/#{name}/owners") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to get partition claims. Result code: #{topic_result.fetch(:rc)}" end partition_claims, threads, mutex = {}, [], Mutex.new topic_result.fetch(:children).each do |topic_name| threads << Thread.new do topic = cluster.topic(topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/owners/#{topic.name}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = [] partition_result.fetch(:children).each do |partition_id| partition_threads << Thread.new do partition = topic.partition(partition_id.to_i) claim_result =cluster.zk.get(path: "/consumers/#{name}/owners/#{topic.name}/#{partition.id}") raise Kazoo::Error, "Failed to get partition claims. Result code: #{claim_result.fetch(:rc)}" if claim_result.fetch(:rc) != Zookeeper::Constants::ZOK mutex.synchronize { partition_claims[partition] = instantiate(id: claim_result.fetch(:data)) } end end partition_threads.each(&:join) end end threads.each(&:join) return partition_claims end |
#partitions ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/kazoo/consumergroup.rb', line 110 def partitions partitions, threads, mutex = [], [], Mutex.new topics.each do |topic| threads << Thread.new do topic_partitions = topic.partitions mutex.synchronize { partitions.concat(topic_partitions) } end end threads.each(&:join) return partitions end |
#reset_all_offsets ⇒ Object
260 261 262 |
# File 'lib/kazoo/consumergroup.rb', line 260 def reset_all_offsets cluster.send(:recursive_delete, path: "/consumers/#{name}/offsets") end |
#retrieve_all_offsets ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/kazoo/consumergroup.rb', line 171 def retrieve_all_offsets topic_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets") case topic_result.fetch(:rc) when Zookeeper::Constants::ZOK; # continue when Zookeeper::Constants::ZNONODE; return {} else raise Kazoo::Error, "Failed to get topic offsets. Result code: #{topic_result.fetch(:rc)}" end offsets, mutex = {}, Mutex.new topic_threads = topic_result.fetch(:children).map do |topic_name| Thread.new do Thread.abort_on_exception = true topic = cluster.topic(topic_name) partition_result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic.name}") raise Kazoo::Error, "Failed to get partition offsets. Result code: #{partition_result.fetch(:rc)}" if partition_result.fetch(:rc) != Zookeeper::Constants::ZOK partition_threads = partition_result.fetch(:children).map do |partition_id| Thread.new do Thread.abort_on_exception = true partition = topic.partition(partition_id.to_i) offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}") offset = case offset_result.fetch(:rc) when Zookeeper::Constants::ZOK offset_result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE nil else raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}" end mutex.synchronize { offsets[partition] = offset } end end partition_threads.each(&:join) end end topic_threads.each(&:join) return offsets end |
#retrieve_offset(partition) ⇒ Object
162 163 164 165 166 167 168 169 |
# File 'lib/kazoo/consumergroup.rb', line 162 def retrieve_offset(partition) result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}") case result.fetch(:rc) when Zookeeper::Constants::ZOK; result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE; nil else raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#retrieve_offsets(subscription = self.subscription) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/kazoo/consumergroup.rb', line 213 def retrieve_offsets(subscription = self.subscription) subscription = Kazoo::Subscription.build(subscription) offsets, mutex = {}, Mutex.new topic_threads = subscription.topics(cluster).map do |topic| Thread.new do Thread.abort_on_exception = true partition_threads = topic.partitions.map do |partition| Thread.new do Thread.abort_on_exception = true offset_result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{topic.name}/#{partition.id}") offset = case offset_result.fetch(:rc) when Zookeeper::Constants::ZOK offset_result.fetch(:data).to_i when Zookeeper::Constants::ZNONODE nil else raise Kazoo::Error, "Failed to retrieve offset for #{partition.key}. Error code: #{offset_result.fetch(:rc)}" end mutex.synchronize { offsets[partition] = offset } end end partition_threads.each(&:join) end end topic_threads.each(&:join) return offsets end |
#subscribed_topics ⇒ Object Also known as: topics
104 105 106 |
# File 'lib/kazoo/consumergroup.rb', line 104 def subscribed_topics subscription.topics(cluster) end |
#subscription ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/kazoo/consumergroup.rb', line 36 def subscription subscriptions = instances.map(&:subscription).compact raise NoRunningInstances, "Consumergroup #{name} has no running instances; cannot determine subscription" if subscriptions.length == 0 subscriptions.uniq! raise InconsistentSubscriptions, "Subscriptions of running instances are different from each other" if subscriptions.length != 1 subscriptions.first end |
#unclaimed_partitions ⇒ Object
123 124 125 |
# File 'lib/kazoo/consumergroup.rb', line 123 def unclaimed_partitions partitions - partition_claims.keys end |
#watch_instances(&block) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/kazoo/consumergroup.rb', line 62 def watch_instances(&block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb) instances = case result.fetch(:rc) when Zookeeper::Constants::ZOK instances_with_subscription(result.fetch(:children)) when Zookeeper::Constants::ZNONODE [] else raise Kazoo::Error, "Failed getting a list of runniong instances for #{name}. Error code: #{result.fetch(:rc)}" end [instances, cb] end |
#watch_partition_claim(partition, &block) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/kazoo/consumergroup.rb', line 77 def watch_partition_claim(partition, &block) cb = Zookeeper::Callbacks::WatcherCallback.create(&block) result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb) case result.fetch(:rc) when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet [nil, nil] when Zookeeper::Constants::ZOK [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb] else raise Kazoo::Error, "Failed to set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |