Class: Kafka::Protocol::OffsetFetchResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/offset_fetch_response.rb

Defined Under Namespace

Classes: PartitionOffsetInfo

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topics:) ⇒ OffsetFetchResponse

Returns a new instance of OffsetFetchResponse.



16
17
18
# File 'lib/kafka/protocol/offset_fetch_response.rb', line 16

def initialize(topics:)
  @topics = topics
end

Instance Attribute Details

#topicsObject (readonly)

Returns the value of attribute topics.



14
15
16
# File 'lib/kafka/protocol/offset_fetch_response.rb', line 14

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/kafka/protocol/offset_fetch_response.rb', line 31

def self.decode(decoder)
  topics = decoder.array {
    topic = decoder.string

    partitions = decoder.array {
      partition = decoder.int32

      info = PartitionOffsetInfo.new(
        offset: decoder.int64,
        metadata: decoder.string,
        error_code: decoder.int16,
      )

      [partition, info]
    }

    [topic, Hash[partitions]]
  }

  new(topics: Hash[topics])
end

Instance Method Details

#offset_for(topic, partition) ⇒ Object



20
21
22
23
24
25
26
27
28
29
# File 'lib/kafka/protocol/offset_fetch_response.rb', line 20

def offset_for(topic, partition)
  offset_info = topics.fetch(topic).fetch(partition, nil)

  if offset_info
    Protocol.handle_error(offset_info.error_code)
    offset_info.offset
  else
    -1
  end
end