Class: Kafka::Cluster
- Inherits:
-
Object
- Object
- Kafka::Cluster
- Defined in:
- lib/kafka/cluster.rb
Overview
A cluster represents the state of a Kafka cluster. It needs to be initialized with a non-empty list of seed brokers. The first seed broker that the cluster can connect to will be asked for the cluster metadata, which allows the cluster to map topic partitions to the current leader for those partitions.
Instance Method Summary collapse
- #add_target_topics(topics) ⇒ Object
- #disconnect ⇒ Object
- #get_group_coordinator(group_id:) ⇒ Object
-
#get_leader(topic, partition) ⇒ Broker
Finds the broker acting as the leader of the given topic and partition.
-
#initialize(seed_brokers:, broker_pool:, logger:) ⇒ Cluster
constructor
Initializes a Cluster with a set of seed brokers.
- #mark_as_stale! ⇒ Object
- #partitions_for(topic) ⇒ Object
- #refresh_metadata! ⇒ Object
- #refresh_metadata_if_necessary! ⇒ Object
- #topics ⇒ Object
Constructor Details
#initialize(seed_brokers:, broker_pool:, logger:) ⇒ Cluster
Initializes a Cluster with a set of seed brokers.
The cluster will try to fetch cluster metadata from one of the brokers.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/kafka/cluster.rb', line 19 def initialize(seed_brokers:, broker_pool:, logger:) if seed_brokers.empty? raise ArgumentError, "At least one seed broker must be configured" end @logger = logger @seed_brokers = seed_brokers @broker_pool = broker_pool @cluster_info = nil @stale = true # This is the set of topics we need metadata for. If empty, metadata for # all topics will be fetched. @target_topics = Set.new end |
Instance Method Details
#add_target_topics(topics) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/kafka/cluster.rb', line 35 def add_target_topics(topics) new_topics = Set.new(topics) - @target_topics unless new_topics.empty? @logger.info "New topics added to target list: #{new_topics.to_a.join(', ')}" @target_topics.merge(new_topics) end end |
#disconnect ⇒ Object
112 113 114 |
# File 'lib/kafka/cluster.rb', line 112 def disconnect @broker_pool.close end |
#get_group_coordinator(group_id:) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/kafka/cluster.rb', line 69 def get_group_coordinator(group_id:) @logger.debug "Getting group coordinator for `#{group_id}`" cluster_info.brokers.each do |broker_info| begin broker = connect_to_broker(broker_info.node_id) response = broker.find_group_coordinator(group_id: group_id) Protocol.handle_error(response.error_code) coordinator_id = response.coordinator_id coordinator = connect_to_broker(coordinator_id) @logger.debug "Coordinator for group `#{group_id}` is #{coordinator}" return coordinator rescue GroupCoordinatorNotAvailable @logger.debug "Coordinator not available; retrying in 1s" sleep 1 retry rescue ConnectionError => e @logger.error "Failed to get group coordinator info from #{broker}: #{e}" end end raise Kafka::Error, "Failed to find group coordinator" end |
#get_leader(topic, partition) ⇒ Broker
Finds the broker acting as the leader of the given topic and partition.
65 66 67 |
# File 'lib/kafka/cluster.rb', line 65 def get_leader(topic, partition) connect_to_broker(get_leader_id(topic, partition)) end |
#mark_as_stale! ⇒ Object
47 48 49 |
# File 'lib/kafka/cluster.rb', line 47 def mark_as_stale! @stale = true end |
#partitions_for(topic) ⇒ Object
99 100 101 102 103 104 105 106 |
# File 'lib/kafka/cluster.rb', line 99 def partitions_for(topic) add_target_topics([topic]) cluster_info.partitions_for(topic) rescue Kafka::ProtocolError mark_as_stale! raise end |
#refresh_metadata! ⇒ Object
51 52 53 54 |
# File 'lib/kafka/cluster.rb', line 51 def @cluster_info = nil cluster_info end |
#refresh_metadata_if_necessary! ⇒ Object
56 57 58 |
# File 'lib/kafka/cluster.rb', line 56 def if @stale end |
#topics ⇒ Object
108 109 110 |
# File 'lib/kafka/cluster.rb', line 108 def topics cluster_info.topics.map(&:topic_name) end |