Class: Kazoo::Consumergroup::Instance
- Inherits:
-
Object
- Object
- Kazoo::Consumergroup::Instance
- Defined in:
- lib/kazoo/consumergroup.rb
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Class Method Summary collapse
Instance Method Summary collapse
- #claim_partition(partition) ⇒ Object
- #created_at ⇒ Object
- #deregister ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #hash ⇒ Object
-
#initialize(group, id: nil) ⇒ Instance
constructor
A new instance of Instance.
- #inspect ⇒ Object
- #register(subscription) ⇒ Object
- #registered? ⇒ Boolean
- #release_partition(partition) ⇒ Object
Constructor Details
#initialize(group, id: nil) ⇒ Instance
Returns a new instance of Instance.
230 231 232 233 |
# File 'lib/kazoo/consumergroup.rb', line 230 def initialize(group, id: nil) @group = group @id = id || self.class.generate_id end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
228 229 230 |
# File 'lib/kazoo/consumergroup.rb', line 228 def group @group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
228 229 230 |
# File 'lib/kazoo/consumergroup.rb', line 228 def id @id end |
Class Method Details
.generate_id ⇒ Object
224 225 226 |
# File 'lib/kazoo/consumergroup.rb', line 224 def self.generate_id "#{Socket.gethostname}:#{SecureRandom.uuid}" end |
Instance Method Details
#claim_partition(partition) ⇒ Object
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/kazoo/consumergroup.rb', line 279 def claim_partition(partition) result = cluster.zk.create( path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}", ephemeral: true, data: id, ) case result.fetch(:rc) when Zookeeper::Constants::ZOK return true when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!" else raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |
#created_at ⇒ Object
267 268 269 270 271 272 |
# File 'lib/kazoo/consumergroup.rb', line 267 def created_at result = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") raise Kazoo::Error, "Failed to get instance details. Error code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK Time.at(result.fetch(:stat).mtime / 1000.0) end |
#deregister ⇒ Object
275 276 277 |
# File 'lib/kazoo/consumergroup.rb', line 275 def deregister cluster.zk.delete(path: "/consumers/#{group.name}/ids/#{id}") end |
#eql?(other) ⇒ Boolean Also known as: ==
311 312 313 |
# File 'lib/kazoo/consumergroup.rb', line 311 def eql?(other) other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id end |
#hash ⇒ Object
307 308 309 |
# File 'lib/kazoo/consumergroup.rb', line 307 def hash [group, id].hash end |
#inspect ⇒ Object
303 304 305 |
# File 'lib/kazoo/consumergroup.rb', line 303 def inspect "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>" end |
#register(subscription) ⇒ Object
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/kazoo/consumergroup.rb', line 240 def register(subscription) result = cluster.zk.create( path: "/consumers/#{group.name}/ids/#{id}", ephemeral: true, data: JSON.generate({ version: 1, timestamp: Time.now.to_i, pattern: "static", subscription: Hash[*subscription.flat_map { |topic| [topic.name, 1] } ] }) ) if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end subscription.each do |topic| stat = cluster.zk.stat(path: "/consumers/#{group.name}/owners/#{topic.name}") unless stat.fetch(:stat).exists? result = cluster.zk.create(path: "/consumers/#{group.name}/owners/#{topic.name}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}" end end end end |
#registered? ⇒ Boolean
235 236 237 238 |
# File 'lib/kazoo/consumergroup.rb', line 235 def registered? stat = cluster.zk.stat(path: "/consumers/#{group.name}/ids/#{id}") stat.fetch(:stat).exists? end |
#release_partition(partition) ⇒ Object
296 297 298 299 300 301 |
# File 'lib/kazoo/consumergroup.rb', line 296 def release_partition(partition) result = cluster.zk.delete(path: "/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}") if result.fetch(:rc) != Zookeeper::Constants::ZOK raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}" end end |