Class: Kafka::Protocol::ListOffsetResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/list_offset_response.rb

Overview

A response to a list offset request.

API Specification

OffsetResponse => [TopicName [PartitionOffsets]]
  ThrottleTimeMS => int32
  PartitionOffsets => Partition ErrorCode Timestamp Offset
  Partition => int32
  ErrorCode => int16
  Timestamp => int64
  Offset => int64

Defined Under Namespace

Classes: PartitionOffsetInfo, TopicOffsetInfo

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topics:) ⇒ ListOffsetResponse


41
42
43
# File 'lib/kafka/protocol/list_offset_response.rb', line 41

def initialize(topics:)
  @topics = topics
end

Instance Attribute Details

#topicsObject (readonly)

Returns the value of attribute topics


39
40
41
# File 'lib/kafka/protocol/list_offset_response.rb', line 39

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ Object


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kafka/protocol/list_offset_response.rb', line 65

def self.decode(decoder)
  _throttle_time_ms = decoder.int32
  topics = decoder.array do
    name = decoder.string

    partition_offsets = decoder.array do
      PartitionOffsetInfo.new(
        partition: decoder.int32,
        error_code: decoder.int16,
        timestamp: decoder.int64,
        offset: decoder.int64
      )
    end

    TopicOffsetInfo.new(
      name: name,
      partition_offsets: partition_offsets
    )
  end

  new(topics: topics)
end

Instance Method Details

#offset_for(topic, partition) ⇒ Object


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/kafka/protocol/list_offset_response.rb', line 45

def offset_for(topic, partition)
  topic_info = @topics.find {|t| t.name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "Unknown topic #{topic}"
  end

  partition_info = topic_info
    .partition_offsets
    .find {|p| p.partition == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "Unknown partition #{topic}/#{partition}"
  end

  Protocol.handle_error(partition_info.error_code)

  partition_info.offset
end