Class: Kafka::Protocol::FetchResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/fetch_response.rb

Overview

A response to a fetch request.

API Specification

FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32

Defined Under Namespace

Classes: FetchedPartition, FetchedTopic

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topics: []) ⇒ FetchResponse

Returns a new instance of FetchResponse.



41
42
43
# File 'lib/kafka/protocol/fetch_response.rb', line 41

def initialize(topics: [])
  @topics = topics
end

Instance Attribute Details

#topicsObject (readonly)

Returns the value of attribute topics.



39
40
41
# File 'lib/kafka/protocol/fetch_response.rb', line 39

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/kafka/protocol/fetch_response.rb', line 45

def self.decode(decoder)
  topics = decoder.array do
    topic_name = decoder.string

    partitions = decoder.array do
      partition = decoder.int32
      error_code = decoder.int16
      highwater_mark_offset = decoder.int64

      message_set_decoder = Decoder.from_string(decoder.bytes)
      message_set = MessageSet.decode(message_set_decoder)

      FetchedPartition.new(
        partition: partition,
        error_code: error_code,
        highwater_mark_offset: highwater_mark_offset,
        messages: message_set.messages,
      )
    end

    FetchedTopic.new(
      name: topic_name,
      partitions: partitions,
    )
  end

  new(topics: topics)
end