Class: Kafka::Cluster
- Inherits:
-
Object
- Object
- Kafka::Cluster
- Defined in:
- lib/kafka/cluster.rb
Overview
A cluster represents the state of a Kafka cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the cluster can connect to will be asked for the cluster metadata, which allows the cluster to map topic partitions to the current leader for those partitions.
Instance Method Summary collapse
- #add_target_topics(topics) ⇒ Object
- #disconnect ⇒ Object
-
#get_leader(topic, partition) ⇒ Broker
Finds the broker acting as the leader of the given topic and partition.
-
#initialize(seed_brokers:, broker_pool:, logger:) ⇒ Cluster
constructor
Initializes a Cluster with a set of seed brokers.
- #mark_as_stale! ⇒ Object
- #partitions_for(topic) ⇒ Object
- #refresh_metadata! ⇒ Object
- #refresh_metadata_if_necessary! ⇒ Object
- #topics ⇒ Object
Constructor Details
#initialize(seed_brokers:, broker_pool:, logger:) ⇒ Cluster
Initializes a Cluster with a set of seed brokers.
The cluster will try to fetch cluster metadata from one of the brokers.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/kafka/cluster.rb', line 19 def initialize(seed_brokers:, broker_pool:, logger:) if seed_brokers.empty? raise ArgumentError, "At least one seed broker must be configured" end @logger = logger @seed_brokers = seed_brokers @broker_pool = broker_pool @cluster_info = nil @stale = true # This is the set of topics we need metadata for. If empty, metadata for # all topics will be fetched. @target_topics = Set.new end |
Instance Method Details
#add_target_topics(topics) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/kafka/cluster.rb', line 35 def add_target_topics(topics) new_topics = Set.new(topics) - @target_topics unless new_topics.empty? @logger.info "New topics added to target list: #{new_topics.to_a.join(', ')}" @target_topics.merge(new_topics) end end |
#disconnect ⇒ Object
78 79 80 |
# File 'lib/kafka/cluster.rb', line 78 def disconnect @broker_pool.close end |
#get_leader(topic, partition) ⇒ Broker
Finds the broker acting as the leader of the given topic and partition.
65 66 67 |
# File 'lib/kafka/cluster.rb', line 65 def get_leader(topic, partition) connect_to_broker(get_leader_id(topic, partition)) end |
#mark_as_stale! ⇒ Object
47 48 49 |
# File 'lib/kafka/cluster.rb', line 47 def mark_as_stale! @stale = true end |
#partitions_for(topic) ⇒ Object
69 70 71 72 |
# File 'lib/kafka/cluster.rb', line 69 def partitions_for(topic) add_target_topics([topic]) cluster_info.partitions_for(topic) end |
#refresh_metadata! ⇒ Object
51 52 53 54 |
# File 'lib/kafka/cluster.rb', line 51 def @cluster_info = nil cluster_info end |
#refresh_metadata_if_necessary! ⇒ Object
56 57 58 |
# File 'lib/kafka/cluster.rb', line 56 def if @stale end |
#topics ⇒ Object
74 75 76 |
# File 'lib/kafka/cluster.rb', line 74 def topics cluster_info.topics.map(&:topic_name) end |