Class: Kafka::Protocol::ListOffsetResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/list_offset_response.rb

Overview

A response to a list offset request.

API Specification

OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64

Defined Under Namespace

Classes: PartitionOffsetInfo, TopicOffsetInfo

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topics:) ⇒ ListOffsetResponse

Returns a new instance of ListOffsetResponse.



36
37
38
# File 'lib/kafka/protocol/list_offset_response.rb', line 36

def initialize(topics:)
  @topics = topics
end

Instance Attribute Details

#topicsObject (readonly)

Returns the value of attribute topics.



34
35
36
# File 'lib/kafka/protocol/list_offset_response.rb', line 34

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/kafka/protocol/list_offset_response.rb', line 60

def self.decode(decoder)
  topics = decoder.array do
    name = decoder.string

    partition_offsets = decoder.array do
      PartitionOffsetInfo.new(
        partition: decoder.int32,
        error_code: decoder.int16,
        offsets: decoder.array { decoder.int64 },
      )
    end

    TopicOffsetInfo.new(
      name: name,
      partition_offsets: partition_offsets
    )
  end

  new(topics: topics)
end

Instance Method Details

#offset_for(topic, partition) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kafka/protocol/list_offset_response.rb', line 40

def offset_for(topic, partition)
  topic_info = @topics.find {|t| t.name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "Unknown topic #{topic}"
  end

  partition_info = topic_info
    .partition_offsets
    .find {|p| p.partition == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "Unknown partition #{topic}/#{partition}"
  end

  Protocol.handle_error(partition_info.error_code)

  partition_info.offsets.first
end