Class: Kafka::BrokerPool
- Inherits:
-
Object
- Object
- Kafka::BrokerPool
- Defined in:
- lib/kafka/broker_pool.rb
Overview
A broker pool represents the set of brokers in a cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the pool can connect to will be asked for the cluster metadata, which allows the pool to map topic partitions to the current leader for those partitions.
Instance Method Summary collapse
- #get_broker(broker_id) ⇒ Object
-
#get_leader_id(topic, partition) ⇒ Integer
Finds the broker acting as the leader of the given topic and partition.
-
#initialize(seed_brokers:, client_id:, logger:, socket_timeout: nil) ⇒ BrokerPool
constructor
Initializes a broker pool with a set of seed brokers.
- #mark_as_stale! ⇒ Object
- #partitions_for(topic) ⇒ Object
- #shutdown ⇒ Object
- #topics ⇒ Object
Constructor Details
#initialize(seed_brokers:, client_id:, logger:, socket_timeout: nil) ⇒ BrokerPool
Initializes a broker pool with a set of seed brokers.
The pool will try to fetch cluster metadata from one of the brokers.
19 20 21 22 23 24 25 26 |
# File 'lib/kafka/broker_pool.rb', line 19 def initialize(seed_brokers:, client_id:, logger:, socket_timeout: nil) @client_id = client_id @logger = logger @socket_timeout = socket_timeout @brokers = {} @seed_brokers = seed_brokers @cluster_info = nil end |
Instance Method Details
#get_broker(broker_id) ⇒ Object
41 42 43 |
# File 'lib/kafka/broker_pool.rb', line 41 def get_broker(broker_id) @brokers[broker_id] ||= connect_to_broker(broker_id) end |
#get_leader_id(topic, partition) ⇒ Integer
Finds the broker acting as the leader of the given topic and partition.
37 38 39 |
# File 'lib/kafka/broker_pool.rb', line 37 def get_leader_id(topic, partition) cluster_info.find_leader_id(topic, partition) end |
#mark_as_stale! ⇒ Object
28 29 30 |
# File 'lib/kafka/broker_pool.rb', line 28 def mark_as_stale! @cluster_info = nil end |
#partitions_for(topic) ⇒ Object
45 46 47 |
# File 'lib/kafka/broker_pool.rb', line 45 def partitions_for(topic) cluster_info.partitions_for(topic) end |
#shutdown ⇒ Object
53 54 55 56 57 58 |
# File 'lib/kafka/broker_pool.rb', line 53 def shutdown @brokers.each do |id, broker| @logger.info "Disconnecting broker #{id}" broker.disconnect end end |
#topics ⇒ Object
49 50 51 |
# File 'lib/kafka/broker_pool.rb', line 49 def topics cluster_info.topics.map(&:topic_name) end |