Class: Kafka::Protocol::MetadataResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/metadata_response.rb

Overview

A response to a TopicMetadataRequest.

The response contains information on the brokers, topics, and partitions in the cluster.

  • For each broker a node id, host, and port is provided.
  • For each topic partition the node id of the broker acting as partition leader, as well as a list of node ids for the set of replicas, are given. The isr list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.

API Specification

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32

  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16

  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

Defined Under Namespace

Classes: BrokerInfo, PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, topics:) ⇒ MetadataResponse

Returns a new instance of MetadataResponse.



84
85
86
87
# File 'lib/kafka/protocol/metadata_response.rb', line 84

def initialize(brokers:, topics:)
  @brokers = brokers
  @topics = topics
end

Instance Attribute Details

#brokersArray<BrokerInfo> (readonly)

Returns the list of brokers in the cluster.

Returns:

  • (Array<BrokerInfo>)

    the list of brokers in the cluster.



79
80
81
# File 'lib/kafka/protocol/metadata_response.rb', line 79

def brokers
  @brokers
end

#topicsArray<TopicMetadata> (readonly)

Returns the list of topics in the cluster.

Returns:



82
83
84
# File 'lib/kafka/protocol/metadata_response.rb', line 82

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ MetadataResponse

Decodes a MetadataResponse from a Decoder containing response data.

Parameters:

Returns:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/kafka/protocol/metadata_response.rb', line 133

def self.decode(decoder)
  brokers = decoder.array do
    node_id = decoder.int32
    host = decoder.string
    port = decoder.int32

    BrokerInfo.new(
      node_id: node_id,
      host: host,
      port: port
    )
  end

  topics = decoder.array do
    topic_error_code = decoder.int16
    topic_name = decoder.string

    partitions = decoder.array do
      PartitionMetadata.new(
        partition_error_code: decoder.int16,
        partition_id: decoder.int32,
        leader: decoder.int32,
        replicas: decoder.array { decoder.int32 },
        isr: decoder.array { decoder.int32 },
      )
    end

    TopicMetadata.new(
      topic_error_code: topic_error_code,
      topic_name: topic_name,
      partitions: partitions,
    )
  end

  new(brokers: brokers, topics: topics)
end

Instance Method Details

#find_broker(node_id) ⇒ BrokerInfo

Finds the broker info for the given node id.

Parameters:

  • node_id (Integer)

    the node id of the broker.

Returns:



115
116
117
# File 'lib/kafka/protocol/metadata_response.rb', line 115

def find_broker(node_id)
  @brokers.find {|broker| broker.node_id == node_id }
end

#find_leader_id(topic, partition) ⇒ Integer

Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (Integer)

    the node id of the leader.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/kafka/protocol/metadata_response.rb', line 95

def find_leader_id(topic, partition)
  topic_info = @topics.find {|t| t.topic_name == topic }

  if topic_info.nil?
    raise "no topic #{topic}"
  end

  partition_info = topic_info.partitions.find {|p| p.partition_id == partition }

  if partition_info.nil?
    raise "no partition #{partition} in topic #{topic}"
  end

  partition_info.leader
end

#partitions_for(topic_name) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/kafka/protocol/metadata_response.rb', line 119

def partitions_for(topic_name)
  topic = @topics.find {|t| t.topic_name == topic_name }

  if topic.nil?
    raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
  end

  topic.partitions
end