Class: Kazoo::Consumergroup::Instance
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup::Instance
- Defined in:
- lib/kazoo/consumergroup.rb
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#subscription ⇒ Object
readonly
Returns the value of attribute subscription.
Class Method Summary collapse
Instance Method Summary collapse
- #claim_partition(partition) ⇒ Object
- #created_at ⇒ Object
- #deregister ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #hash ⇒ Object
-
#initialize(group, id: nil, subscription: nil) ⇒ Instance
constructor
A new instance of Instance.
- #inspect ⇒ Object
- #register(subscription_deprecated = nil) ⇒ Object
- #registered? ⇒ Boolean
- #release_partition(partition) ⇒ Object
Constructor Details
#initialize(group, id: nil, subscription: nil) ⇒ Instance
Returns a new instance of Instance.
338 339 340 341 342 |
# File 'lib/kazoo/consumergroup.rb', line 338 def initialize(group, id: nil, subscription: nil) @group = group @id = id || self.class.generate_id @subscription = Kazoo::Subscription.build(subscription) unless subscription.nil? end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
336 337 338 |
# File 'lib/kazoo/consumergroup.rb', line 336 def group @group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
336 337 338 |
# File 'lib/kazoo/consumergroup.rb', line 336 def id @id end |
#subscription ⇒ Object (readonly)
Returns the value of attribute subscription.
336 337 338 |
# File 'lib/kazoo/consumergroup.rb', line 336 def subscription @subscription end |
Class Method Details
.generate_id ⇒ Object
332 333 334 |
# File 'lib/kazoo/consumergroup.rb', line 332 def self.generate_id "#{Socket.gethostname}:#{SecureRandom.uuid}" end |
Instance Method Details
#claim_partition(partition) ⇒ Object
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'lib/kazoo/consumergroup.rb', line 393 def claim_partition(partition) result = cluster.zk.create( path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}", ephemeral: true, data: id, ) case result.fetch(:rc) when Zookeeper::Constants::ZOK return true when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!" else raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#created_at ⇒ Object
376 377 378 379 380 381 |
# File 'lib/kazoo/consumergroup.rb', line 376 def created_at result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end |
#deregister ⇒ Object
384 385 386 387 388 389 390 391 |
# File 'lib/kazoo/consumergroup.rb', line 384 def deregister result = cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to deregister instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end return self end |
#eql?(other) ⇒ Boolean Also known as: ==
425 426 427 |
# File 'lib/kazoo/consumergroup.rb', line 425 def eql?(other) other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id end |
#hash ⇒ Object
421 422 423 |
# File 'lib/kazoo/consumergroup.rb', line 421 def hash [group, id].hash end |
#inspect ⇒ Object
417 418 419 |
# File 'lib/kazoo/consumergroup.rb', line 417 def inspect "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>" end |
#register(subscription_deprecated = nil) ⇒ Object
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/kazoo/consumergroup.rb', line 349 def register(subscription_deprecated = nil) # Don't provide the subscription here, but provide it when instantiating the consumer instance. @subscription = Kazoo::Subscription.build(subscription_deprecated) unless subscription_deprecated.nil? result = cluster.zk.create( path: "/consumers/#{group.name}/ids/#{id}", ephemeral: true, data: subscription.to_json, ) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end subscription.topics(cluster).each do |topic| stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}") unless stat.fetch(:stat).exists? result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end end end return self end |
#registered? ⇒ Boolean
344 345 346 347 |
# File 'lib/kazoo/consumergroup.rb', line 344 def registered? stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") stat.fetch(:stat).exists? end |
#release_partition(partition) ⇒ Object
410 411 412 413 414 415 |
# File 'lib/kazoo/consumergroup.rb', line 410 def release_partition(partition) result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |