Class: Kafka::Protocol::MetadataResponse
- Inherits:
-
Object
- Object
- Kafka::Protocol::MetadataResponse
- Defined in:
- lib/kafka/protocol/metadata_response.rb
Overview
A response to a TopicMetadataRequest.
The response contains information on the brokers, topics, and partitions in the cluster.
- For each broker a node id, host, and port is provided.
- For each topic partition the node id of the broker acting as partition leader,
as well as a list of node ids for the set of replicas, are given. The
isr
list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.
API Specification
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
Defined Under Namespace
Classes: BrokerInfo, PartitionMetadata, TopicMetadata
Instance Attribute Summary collapse
-
#brokers ⇒ Array<BrokerInfo>
readonly
The list of brokers in the cluster.
-
#controller_id ⇒ Integer
readonly
The broker id of the controller broker.
-
#topics ⇒ Array<TopicMetadata>
readonly
The list of topics in the cluster.
Class Method Summary collapse
-
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
Instance Method Summary collapse
-
#find_broker(node_id) ⇒ BrokerInfo
Finds the broker info for the given node id.
-
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
-
#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse
constructor
A new instance of MetadataResponse.
- #partitions_for(topic_name) ⇒ Object
Constructor Details
#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse
Returns a new instance of MetadataResponse.
87 88 89 90 91 |
# File 'lib/kafka/protocol/metadata_response.rb', line 87 def initialize(brokers:, controller_id:, topics:) @brokers = brokers @controller_id = controller_id @topics = topics end |
Instance Attribute Details
#brokers ⇒ Array<BrokerInfo> (readonly)
Returns the list of brokers in the cluster.
79 80 81 |
# File 'lib/kafka/protocol/metadata_response.rb', line 79 def brokers @brokers end |
#controller_id ⇒ Integer (readonly)
Returns The broker id of the controller broker.
85 86 87 |
# File 'lib/kafka/protocol/metadata_response.rb', line 85 def controller_id @controller_id end |
#topics ⇒ Array<TopicMetadata> (readonly)
Returns the list of topics in the cluster.
82 83 84 |
# File 'lib/kafka/protocol/metadata_response.rb', line 82 def topics @topics end |
Class Method Details
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/kafka/protocol/metadata_response.rb', line 151 def self.decode(decoder) brokers = decoder.array do node_id = decoder.int32 host = decoder.string port = decoder.int32 rack = decoder.string BrokerInfo.new( node_id: node_id, host: host, port: port ) end controller_id = decoder.int32 topics = decoder.array do topic_error_code = decoder.int16 topic_name = decoder.string is_internal = decoder.boolean partitions = decoder.array do PartitionMetadata.new( partition_error_code: decoder.int16, partition_id: decoder.int32, leader: decoder.int32, replicas: decoder.array { decoder.int32 }, isr: decoder.array { decoder.int32 }, ) end TopicMetadata.new( topic_error_code: topic_error_code, topic_name: topic_name, partitions: partitions, ) end new(brokers: brokers, controller_id: controller_id, topics: topics) end |
Instance Method Details
#find_broker(node_id) ⇒ BrokerInfo
Finds the broker info for the given node id.
127 128 129 130 131 132 133 |
# File 'lib/kafka/protocol/metadata_response.rb', line 127 def find_broker(node_id) broker = @brokers.find {|broker| broker.node_id == node_id } raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil? broker end |
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/kafka/protocol/metadata_response.rb', line 99 def find_leader_id(topic, partition) topic_info = @topics.find {|t| t.topic_name == topic } if topic_info.nil? raise UnknownTopicOrPartition, "no topic #{topic}" end Protocol.handle_error(topic_info.topic_error_code) partition_info = topic_info.partitions.find {|p| p.partition_id == partition } if partition_info.nil? raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}" end begin Protocol.handle_error(partition_info.partition_error_code) rescue ReplicaNotAvailable # This error can be safely ignored per the protocol specification. end partition_info.leader end |
#partitions_for(topic_name) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/kafka/protocol/metadata_response.rb', line 135 def partitions_for(topic_name) topic = @topics.find {|t| t.topic_name == topic_name } if topic.nil? raise UnknownTopicOrPartition, "unknown topic #{topic_name}" end Protocol.handle_error(topic.topic_error_code) topic.partitions end |