Class: Kafka::BrokerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker_pool.rb

Overview

A broker pool represents the set of brokers in a cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the pool can connect to will be asked for the cluster metadata, which allows the pool to map topic partitions to the current leader for those partitions.

Instance Method Summary collapse

Constructor Details

#initialize(seed_brokers:, client_id:, logger:, socket_timeout: nil) ⇒ BrokerPool

Initializes a broker pool with a set of seed brokers.

The pool will try to fetch cluster metadata from one of the brokers.

Parameters:

  • seed_brokers (Array<String>)
  • client_id (String)
  • logger (Logger)
  • socket_timeout (Integer, nil) (defaults to: nil)


19
20
21
22
23
24
25
26
# File 'lib/kafka/broker_pool.rb', line 19

def initialize(seed_brokers:, client_id:, logger:, socket_timeout: nil)
  @client_id = client_id
  @logger = logger
  @socket_timeout = socket_timeout
  @brokers = {}
  @seed_brokers = seed_brokers
  @cluster_info = nil
end

Instance Method Details

#get_broker(broker_id) ⇒ Object



41
42
43
# File 'lib/kafka/broker_pool.rb', line 41

def get_broker(broker_id)
  @brokers[broker_id] ||= connect_to_broker(broker_id)
end

#get_leader_id(topic, partition) ⇒ Integer

Finds the broker acting as the leader of the given topic and partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Integer)

    the broker id.



37
38
39
# File 'lib/kafka/broker_pool.rb', line 37

def get_leader_id(topic, partition)
  cluster_info.find_leader_id(topic, partition)
end

#mark_as_stale!Object



28
29
30
# File 'lib/kafka/broker_pool.rb', line 28

def mark_as_stale!
  @cluster_info = nil
end

#partitions_for(topic) ⇒ Object



45
46
47
# File 'lib/kafka/broker_pool.rb', line 45

def partitions_for(topic)
  cluster_info.partitions_for(topic)
end

#shutdownObject



53
54
55
56
57
58
# File 'lib/kafka/broker_pool.rb', line 53

def shutdown
  @brokers.each do |id, broker|
    @logger.info "Disconnecting broker #{id}"
    broker.disconnect
  end
end

#topicsObject



49
50
51
# File 'lib/kafka/broker_pool.rb', line 49

def topics
  cluster_info.topics.map(&:topic_name)
end