Class: Kazoo::Consumergroup::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/consumergroup.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group, id: nil, subscription: nil) ⇒ Instance

Returns a new instance of Instance.



338
339
340
341
342
# File 'lib/kazoo/consumergroup.rb', line 338

def initialize(group, id: nil, subscription: nil)
  @group = group
  @id = id || self.class.generate_id
  @subscription = Kazoo::Subscription.build(subscription) unless subscription.nil?
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



336
337
338
# File 'lib/kazoo/consumergroup.rb', line 336

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



336
337
338
# File 'lib/kazoo/consumergroup.rb', line 336

def id
  @id
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



336
337
338
# File 'lib/kazoo/consumergroup.rb', line 336

def subscription
  @subscription
end

Class Method Details

.generate_idObject



332
333
334
# File 'lib/kazoo/consumergroup.rb', line 332

def self.generate_id
  "#{Socket.gethostname}:#{SecureRandom.uuid}"
end

Instance Method Details

#claim_partition(partition) ⇒ Object



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/kazoo/consumergroup.rb', line 393

def claim_partition(partition)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}",
    ephemeral: true,
    data: id,
  )

  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!"
  else
    raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#created_atObject

Raises:



376
377
378
379
380
381
# File 'lib/kazoo/consumergroup.rb', line 376

def created_at
  result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

  Time.at(result.fetch(:stat).mtime / 1000.0)
end

#deregisterObject



384
385
386
387
388
389
390
391
# File 'lib/kazoo/consumergroup.rb', line 384

def deregister
  result = cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to deregister instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  return self
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


425
426
427
# File 'lib/kazoo/consumergroup.rb', line 425

def eql?(other)
  other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id
end

#hashObject



421
422
423
# File 'lib/kazoo/consumergroup.rb', line 421

def hash
  [group, id].hash
end

#inspectObject



417
418
419
# File 'lib/kazoo/consumergroup.rb', line 417

def inspect
  "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>"
end

#register(subscription_deprecated = nil) ⇒ Object



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/kazoo/consumergroup.rb', line 349

def register(subscription_deprecated = nil)
  # Don't provide the subscription here, but provide it when instantiating the consumer instance.
  @subscription = Kazoo::Subscription.build(subscription_deprecated) unless subscription_deprecated.nil?

  result = cluster.zk.create(
    path: "/consumers/#{group.name}/ids/#{id}",
    ephemeral: true,
    data: subscription.to_json,
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  subscription.topics(cluster).each do |topic|
    stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}")
    unless stat.fetch(:stat).exists?
      result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}")
      if result.fetch(:rc) != Zookeeper::Constants::ZOK
        raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
      end
    end
  end

  return self
end

#registered?Boolean

Returns:

  • (Boolean)


344
345
346
347
# File 'lib/kazoo/consumergroup.rb', line 344

def registered?
  stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  stat.fetch(:stat).exists?
end

#release_partition(partition) ⇒ Object



410
411
412
413
414
415
# File 'lib/kazoo/consumergroup.rb', line 410

def release_partition(partition)
  result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end