Class: Kazoo::Partition

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/partition.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, id, replicas: nil) ⇒ Partition

Returns a new instance of Partition.



5
6
7
8
# File 'lib/kazoo/partition.rb', line 5

def initialize(topic, id, replicas: nil)
  @topic, @id, @replicas = topic, id, replicas
  @mutex = Mutex.new
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



3
4
5
# File 'lib/kazoo/partition.rb', line 3

def id
  @id
end

#replicasObject (readonly)

Returns the value of attribute replicas.



3
4
5
# File 'lib/kazoo/partition.rb', line 3

def replicas
  @replicas
end

#topicObject (readonly)

Returns the value of attribute topic.



3
4
5
# File 'lib/kazoo/partition.rb', line 3

def topic
  @topic
end

Instance Method Details

#clusterObject



10
11
12
# File 'lib/kazoo/partition.rb', line 10

def cluster
  topic.cluster
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


61
62
63
# File 'lib/kazoo/partition.rb', line 61

def eql?(other)
  other.kind_of?(Kazoo::Partition) && topic == other.topic && id == other.id
end

#hashObject



67
68
69
# File 'lib/kazoo/partition.rb', line 67

def hash
  [topic, id].hash
end

#inspectObject



53
54
55
# File 'lib/kazoo/partition.rb', line 53

def inspect
  "#<Kazoo::Partition #{topic.name}/#{id}>"
end

#isrObject



29
30
31
32
33
34
# File 'lib/kazoo/partition.rb', line 29

def isr
  @mutex.synchronize do
    refresh_state
    @isr
  end
end

#keyObject



57
58
59
# File 'lib/kazoo/partition.rb', line 57

def key
  "#{topic.name}/#{id}"
end

#leaderObject



22
23
24
25
26
27
# File 'lib/kazoo/partition.rb', line 22

def leader
  @mutex.synchronize do
    refresh_state
    @leader
  end
end

#preferred_leaderObject



18
19
20
# File 'lib/kazoo/partition.rb', line 18

def preferred_leader
  @replicas.first
end

#replication_factorObject



14
15
16
# File 'lib/kazoo/partition.rb', line 14

def replication_factor
  replicas.length
end

#to_json(generator) ⇒ Object



84
85
86
# File 'lib/kazoo/partition.rb', line 84

def to_json(generator)
  generator.generate(topic: topic.name, partition: id)
end

#under_replicated?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/kazoo/partition.rb', line 36

def under_replicated?
  isr.length < replication_factor
end

#valid?Boolean

Returns:

  • (Boolean)


47
48
49
50
51
# File 'lib/kazoo/partition.rb', line 47

def valid?
  validate
rescue Kazoo::ValidationError
  false
end

#validateObject



40
41
42
43
44
45
# File 'lib/kazoo/partition.rb', line 40

def validate
  raise Kazoo::ValidationError, "No replicas defined for #{topic.name}/#{id}" if replicas.length == 0
  raise Kazoo::ValidationError, "The replicas of #{topic.name}/#{id} should be assigned to different brokers" if replicas.length > replicas.uniq.length

  true
end

#wait_for_leaderObject



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/kazoo/partition.rb', line 71

def wait_for_leader
  current_leader = nil
  while current_leader.nil?
    current_leader = begin
      leader
    rescue Kazoo::Error
      nil
    end

    sleep(0.1) if current_leader.nil?
  end
end