Class: Kazoo::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/broker.rb

Overview

Kazoo::Broker represents a Kafka broker in a Kafka cluster.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker

Returns a new instance of Broker.



7
8
9
10
11
# File 'lib/kazoo/broker.rb', line 7

def initialize(cluster, id, host, port, jmx_port: nil)
  @cluster = cluster
  @id, @host, @port = id, host, port
  @jmx_port = jmx_port
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



5
6
7
# File 'lib/kazoo/broker.rb', line 5

def cluster
  @cluster
end

#hostObject (readonly)

Returns the value of attribute host.



5
6
7
# File 'lib/kazoo/broker.rb', line 5

def host
  @host
end

#idObject (readonly)

Returns the value of attribute id.



5
6
7
# File 'lib/kazoo/broker.rb', line 5

def id
  @id
end

#jmx_portObject (readonly)

Returns the value of attribute jmx_port.



5
6
7
# File 'lib/kazoo/broker.rb', line 5

def jmx_port
  @jmx_port
end

#portObject (readonly)

Returns the value of attribute port.



5
6
7
# File 'lib/kazoo/broker.rb', line 5

def port
  @port
end

Class Method Details

.from_json(cluster, id, json) ⇒ Object

Instantiates a Kazoo::Broker instance based on the Broker metadata that is stored in Zookeeper under ‘/brokers/<id>`. TODO: add support for endpoints in Kafka 0.9+



83
84
85
86
87
88
89
90
# File 'lib/kazoo/broker.rb', line 83

def self.from_json(cluster, id, json)
  case json.fetch('version')
  when 1, 2, 3, 4
    new(cluster, id.to_i, json.fetch('host'), json.fetch('port'), jmx_port: json.fetch('jmx_port', nil))
  else
    raise Kazoo::VersionNotSupported
  end
end

Instance Method Details

#addrObject

Returns the address of this broker, i.e. the hostname plus the port to connect to.



62
63
64
# File 'lib/kazoo/broker.rb', line 62

def addr
  "#{host}:#{port}"
end

#critical?(replicas: 1) ⇒ Boolean

Returns whether this broker is currently considered critical.

A broker is considered critical if it is the only in sync replica of any of the partitions it hosts. This means that if this broker were to go down, the partition woild become unavailable for writes, and may also lose data depending on the configuration and settings.

Returns:

  • (Boolean)


47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kazoo/broker.rb', line 47

def critical?(replicas: 1)
  result, mutex = false, Mutex.new
  threads = replicated_partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      isr = partition.isr.reject { |r| r == self }
      mutex.synchronize { result = true if isr.length < Integer(replicas) }
    end
  end
  threads.each(&:join)
  result
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


66
67
68
# File 'lib/kazoo/broker.rb', line 66

def eql?(other)
  other.is_a?(Kazoo::Broker) && other.cluster == self.cluster && other.id == self.id
end

#hashObject



72
73
74
# File 'lib/kazoo/broker.rb', line 72

def hash
  [self.cluster, self.id].hash
end

#inspectObject



76
77
78
# File 'lib/kazoo/broker.rb', line 76

def inspect
  "#<Kazoo::Broker id=#{id} addr=#{addr}>"
end

#led_partitionsObject

Returns a list of all partitions that are currently led by this broker.



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/kazoo/broker.rb', line 14

def led_partitions
  result, mutex = [], Mutex.new
  threads = cluster.partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      select = partition.leader == self
      mutex.synchronize { result << partition } if select
    end
  end
  threads.each(&:join)
  result
end

#replicated_partitionsObject

Returns a list of all partitions that host a replica on this broker.



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kazoo/broker.rb', line 28

def replicated_partitions
  result, mutex = [], Mutex.new
  threads = cluster.partitions.map do |partition|
    Thread.new do
      Thread.abort_on_exception = true
      select = partition.replicas.include?(self)
      mutex.synchronize { result << partition } if select
    end
  end
  threads.each(&:join)
  result
end