Class: Kazoo::Broker
- Inherits:
-
Object
- Object
- Kazoo::Broker
- Defined in:
- lib/kazoo/broker.rb
Overview
Kazoo::Broker represents a Kafka broker in a Kafka cluster.
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#jmx_port ⇒ Object
readonly
Returns the value of attribute jmx_port.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Class Method Summary collapse
-
.from_json(cluster, id, json) ⇒ Object
Instantiates a Kazoo::Broker instance based on the Broker metadata that is stored in Zookeeper under ‘/brokers/<id>`.
Instance Method Summary collapse
-
#addr ⇒ Object
Returns the address of this broker, i.e.
-
#critical?(replicas: 1) ⇒ Boolean
Returns whether this broker is currently considered critical.
- #eql?(other) ⇒ Boolean (also: #==)
- #hash ⇒ Object
-
#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker
constructor
A new instance of Broker.
- #inspect ⇒ Object
-
#led_partitions ⇒ Object
Returns a list of all partitions that are currently led by this broker.
-
#replicated_partitions ⇒ Object
Returns a list of all partitions that host a replica on this broker.
Constructor Details
#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker
Returns a new instance of Broker.
7 8 9 10 11 |
# File 'lib/kazoo/broker.rb', line 7 def initialize(cluster, id, host, port, jmx_port: nil) @cluster = cluster @id, @host, @port = id, host, port @jmx_port = jmx_port end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
5 6 7 |
# File 'lib/kazoo/broker.rb', line 5 def cluster @cluster end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
5 6 7 |
# File 'lib/kazoo/broker.rb', line 5 def host @host end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
5 6 7 |
# File 'lib/kazoo/broker.rb', line 5 def id @id end |
#jmx_port ⇒ Object (readonly)
Returns the value of attribute jmx_port.
5 6 7 |
# File 'lib/kazoo/broker.rb', line 5 def jmx_port @jmx_port end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
5 6 7 |
# File 'lib/kazoo/broker.rb', line 5 def port @port end |
Class Method Details
.from_json(cluster, id, json) ⇒ Object
Instantiates a Kazoo::Broker instance based on the Broker metadata that is stored in Zookeeper under ‘/brokers/<id>`. TODO: add support for endpoints in Kafka 0.9+
83 84 85 86 87 88 89 90 |
# File 'lib/kazoo/broker.rb', line 83 def self.from_json(cluster, id, json) case json.fetch('version') when 1, 2, 3, 4 new(cluster, id.to_i, json.fetch('host'), json.fetch('port'), jmx_port: json.fetch('jmx_port', nil)) else raise Kazoo::VersionNotSupported end end |
Instance Method Details
#addr ⇒ Object
Returns the address of this broker, i.e. the hostname plus the port to connect to.
62 63 64 |
# File 'lib/kazoo/broker.rb', line 62 def addr "#{host}:#{port}" end |
#critical?(replicas: 1) ⇒ Boolean
Returns whether this broker is currently considered critical.
A broker is considered critical if it is the only in sync replica of any of the partitions it hosts. This means that if this broker were to go down, the partition woild become unavailable for writes, and may also lose data depending on the configuration and settings.
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/kazoo/broker.rb', line 47 def critical?(replicas: 1) result, mutex = false, Mutex.new threads = replicated_partitions.map do |partition| Thread.new do Thread.abort_on_exception = true isr = partition.isr.reject { |r| r == self } mutex.synchronize { result = true if isr.length < Integer(replicas) } end end threads.each(&:join) result end |
#eql?(other) ⇒ Boolean Also known as: ==
66 67 68 |
# File 'lib/kazoo/broker.rb', line 66 def eql?(other) other.is_a?(Kazoo::Broker) && other.cluster == self.cluster && other.id == self.id end |
#hash ⇒ Object
72 73 74 |
# File 'lib/kazoo/broker.rb', line 72 def hash [self.cluster, self.id].hash end |
#inspect ⇒ Object
76 77 78 |
# File 'lib/kazoo/broker.rb', line 76 def inspect "#<Kazoo::Broker id=#{id} addr=#{addr}>" end |
#led_partitions ⇒ Object
Returns a list of all partitions that are currently led by this broker.
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/kazoo/broker.rb', line 14 def led_partitions result, mutex = [], Mutex.new threads = cluster.partitions.map do |partition| Thread.new do Thread.abort_on_exception = true select = partition.leader == self mutex.synchronize { result << partition } if select end end threads.each(&:join) result end |
#replicated_partitions ⇒ Object
Returns a list of all partitions that host a replica on this broker.
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/kazoo/broker.rb', line 28 def replicated_partitions result, mutex = [], Mutex.new threads = cluster.partitions.map do |partition| Thread.new do Thread.abort_on_exception = true select = partition.replicas.include?(self) mutex.synchronize { result << partition } if select end end threads.each(&:join) result end |