Class: Kafka::Protocol::MetadataResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/metadata_response.rb

Overview

A response to a TopicMetadataRequest.

The response contains information on the brokers, topics, and partitions in the cluster.

  • For each broker a node id, host, and port is provided.
  • For each topic partition the node id of the broker acting as partition leader, as well as a list of node ids for the set of replicas, are given. The isr list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.

API Specification

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32

  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16

  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

Defined Under Namespace

Classes: BrokerInfo, PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, topics:) ⇒ MetadataResponse

Returns a new instance of MetadataResponse.



84
85
86
87
# File 'lib/kafka/protocol/metadata_response.rb', line 84

def initialize(brokers:, topics:)
  @brokers = brokers
  @topics = topics
end

Instance Attribute Details

#brokersArray<BrokerInfo> (readonly)

Returns the list of brokers in the cluster.

Returns:

  • (Array<BrokerInfo>)

    the list of brokers in the cluster.



79
80
81
# File 'lib/kafka/protocol/metadata_response.rb', line 79

def brokers
  @brokers
end

#topicsArray<TopicMetadata> (readonly)

Returns the list of topics in the cluster.

Returns:



82
83
84
# File 'lib/kafka/protocol/metadata_response.rb', line 82

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ MetadataResponse

Decodes a MetadataResponse from a Decoder containing response data.

Parameters:

Returns:



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/kafka/protocol/metadata_response.rb', line 143

def self.decode(decoder)
  brokers = decoder.array do
    node_id = decoder.int32
    host = decoder.string
    port = decoder.int32

    BrokerInfo.new(
      node_id: node_id,
      host: host,
      port: port
    )
  end

  topics = decoder.array do
    topic_error_code = decoder.int16
    topic_name = decoder.string

    partitions = decoder.array do
      PartitionMetadata.new(
        partition_error_code: decoder.int16,
        partition_id: decoder.int32,
        leader: decoder.int32,
        replicas: decoder.array { decoder.int32 },
        isr: decoder.array { decoder.int32 },
      )
    end

    TopicMetadata.new(
      topic_error_code: topic_error_code,
      topic_name: topic_name,
      partitions: partitions,
    )
  end

  new(brokers: brokers, topics: topics)
end

Instance Method Details

#find_broker(node_id) ⇒ BrokerInfo

Finds the broker info for the given node id.

Parameters:

  • node_id (Integer)

    the node id of the broker.

Returns:



123
124
125
# File 'lib/kafka/protocol/metadata_response.rb', line 123

def find_broker(node_id)
  @brokers.find {|broker| broker.node_id == node_id }
end

#find_leader_id(topic, partition) ⇒ Integer

Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (Integer)

    the node id of the leader.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/kafka/protocol/metadata_response.rb', line 95

def find_leader_id(topic, partition)
  topic_info = @topics.find {|t| t.topic_name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "no topic #{topic}"
  end

  Protocol.handle_error(topic_info.topic_error_code)

  partition_info = topic_info.partitions.find {|p| p.partition_id == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}"
  end

  begin
    Protocol.handle_error(partition_info.partition_error_code)
  rescue ReplicaNotAvailable
    # This error can be safely ignored per the protocol specification.
  end

  partition_info.leader
end

#partitions_for(topic_name) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/protocol/metadata_response.rb', line 127

def partitions_for(topic_name)
  topic = @topics.find {|t| t.topic_name == topic_name }

  if topic.nil?
    raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
  end

  Protocol.handle_error(topic.topic_error_code)

  topic.partitions
end