Class: Kazoo::Consumergroup::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/consumergroup.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group, id: nil) ⇒ Instance

Returns a new instance of Instance.



230
231
232
233
# File 'lib/kazoo/consumergroup.rb', line 230

def initialize(group, id: nil)
  @group = group
  @id = id || self.class.generate_id
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



228
229
230
# File 'lib/kazoo/consumergroup.rb', line 228

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



228
229
230
# File 'lib/kazoo/consumergroup.rb', line 228

def id
  @id
end

Class Method Details

.generate_idObject



224
225
226
# File 'lib/kazoo/consumergroup.rb', line 224

def self.generate_id
  "#{Socket.gethostname}:#{SecureRandom.uuid}"
end

Instance Method Details

#claim_partition(partition) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/kazoo/consumergroup.rb', line 279

def claim_partition(partition)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}",
    ephemeral: true,
    data: id,
  )

  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!"
  else
    raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#created_atObject

Raises:



267
268
269
270
271
272
# File 'lib/kazoo/consumergroup.rb', line 267

def created_at
  result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

  Time.at(result.fetch(:stat).mtime / 1000.0)
end

#deregisterObject



275
276
277
# File 'lib/kazoo/consumergroup.rb', line 275

def deregister
  cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}")
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


311
312
313
# File 'lib/kazoo/consumergroup.rb', line 311

def eql?(other)
  other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id
end

#hashObject



307
308
309
# File 'lib/kazoo/consumergroup.rb', line 307

def hash
  [group, id].hash
end

#inspectObject



303
304
305
# File 'lib/kazoo/consumergroup.rb', line 303

def inspect
  "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>"
end

#register(subscription) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/kazoo/consumergroup.rb', line 240

def register(subscription)
  result = cluster.zk.create(
    path: "/consumers/#{group.name}/ids/#{id}",
    ephemeral: true,
    data: JSON.generate({
      version: 1,
      timestamp: Time.now.to_i,
      pattern: "static",
      subscription: Hash[*subscription.flat_map { |topic| [topic.name, 1] } ]
    })
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  subscription.each do |topic|
    stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}")
    unless stat.fetch(:stat).exists?
      result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}")
      if result.fetch(:rc) != Zookeeper::Constants::ZOK
        raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
      end
    end
  end
end

#registered?Boolean

Returns:

  • (Boolean)


235
236
237
238
# File 'lib/kazoo/consumergroup.rb', line 235

def registered?
  stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}")
  stat.fetch(:stat).exists?
end

#release_partition(partition) ⇒ Object



296
297
298
299
300
301
# File 'lib/kazoo/consumergroup.rb', line 296

def release_partition(partition)
  result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}")
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end