Class: Kafka::Protocol::MetadataResponse
- Inherits:
-
Object
- Object
- Kafka::Protocol::MetadataResponse
- Defined in:
- lib/kafka/protocol/metadata_response.rb
Overview
A response to a TopicMetadataRequest.
The response contains information on the brokers, topics, and partitions in the cluster.
- For each broker a node id, host, and port is provided.
- For each topic partition the node id of the broker acting as partition leader,
as well as a list of node ids for the set of replicas, are given. The
isr
list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.
API Specification
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
Defined Under Namespace
Classes: BrokerInfo, PartitionMetadata, TopicMetadata
Instance Attribute Summary collapse
-
#brokers ⇒ Array<BrokerInfo>
readonly
The list of brokers in the cluster.
-
#topics ⇒ Array<TopicMetadata>
readonly
The list of topics in the cluster.
Class Method Summary collapse
-
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
Instance Method Summary collapse
-
#find_broker(node_id) ⇒ BrokerInfo
Finds the broker info for the given node id.
-
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
-
#initialize(brokers:, topics:) ⇒ MetadataResponse
constructor
A new instance of MetadataResponse.
- #partitions_for(topic_name) ⇒ Object
Constructor Details
#initialize(brokers:, topics:) ⇒ MetadataResponse
Returns a new instance of MetadataResponse.
84 85 86 87 |
# File 'lib/kafka/protocol/metadata_response.rb', line 84 def initialize(brokers:, topics:) @brokers = brokers @topics = topics end |
Instance Attribute Details
#brokers ⇒ Array<BrokerInfo> (readonly)
Returns the list of brokers in the cluster.
79 80 81 |
# File 'lib/kafka/protocol/metadata_response.rb', line 79 def brokers @brokers end |
#topics ⇒ Array<TopicMetadata> (readonly)
Returns the list of topics in the cluster.
82 83 84 |
# File 'lib/kafka/protocol/metadata_response.rb', line 82 def topics @topics end |
Class Method Details
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/kafka/protocol/metadata_response.rb', line 133 def self.decode(decoder) brokers = decoder.array do node_id = decoder.int32 host = decoder.string port = decoder.int32 BrokerInfo.new( node_id: node_id, host: host, port: port ) end topics = decoder.array do topic_error_code = decoder.int16 topic_name = decoder.string partitions = decoder.array do PartitionMetadata.new( partition_error_code: decoder.int16, partition_id: decoder.int32, leader: decoder.int32, replicas: decoder.array { decoder.int32 }, isr: decoder.array { decoder.int32 }, ) end TopicMetadata.new( topic_error_code: topic_error_code, topic_name: topic_name, partitions: partitions, ) end new(brokers: brokers, topics: topics) end |
Instance Method Details
#find_broker(node_id) ⇒ BrokerInfo
Finds the broker info for the given node id.
115 116 117 |
# File 'lib/kafka/protocol/metadata_response.rb', line 115 def find_broker(node_id) @brokers.find {|broker| broker.node_id == node_id } end |
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/kafka/protocol/metadata_response.rb', line 95 def find_leader_id(topic, partition) topic_info = @topics.find {|t| t.topic_name == topic } if topic_info.nil? raise "no topic #{topic}" end partition_info = topic_info.partitions.find {|p| p.partition_id == partition } if partition_info.nil? raise "no partition #{partition} in topic #{topic}" end partition_info.leader end |
#partitions_for(topic_name) ⇒ Object
119 120 121 122 123 124 125 126 127 |
# File 'lib/kafka/protocol/metadata_response.rb', line 119 def partitions_for(topic_name) topic = @topics.find {|t| t.topic_name == topic_name } if topic.nil? raise UnknownTopicOrPartition, "unknown topic #{topic_name}" end topic.partitions end |