Class: Kafka::Protocol::FetchRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/fetch_request.rb

Overview

A request to fetch messages from a given partition.

API Specification

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Instance Method Summary collapse

Constructor Details

#initialize(max_wait_time:, min_bytes:, topics:) ⇒ FetchRequest

Returns a new instance of FetchRequest.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)


22
23
24
25
26
27
# File 'lib/kafka/protocol/fetch_request.rb', line 22

def initialize(max_wait_time:, min_bytes:, topics:)
  @replica_id = REPLICA_ID
  @max_wait_time = max_wait_time
  @min_bytes = min_bytes
  @topics = topics
end

Instance Method Details

#api_keyObject



29
30
31
# File 'lib/kafka/protocol/fetch_request.rb', line 29

def api_key
  1
end

#encode(encoder) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kafka/protocol/fetch_request.rb', line 37

def encode(encoder)
  encoder.write_int32(@replica_id)
  encoder.write_int32(@max_wait_time)
  encoder.write_int32(@min_bytes)

  encoder.write_array(@topics) do |topic, partitions|
    encoder.write_string(topic)

    encoder.write_array(partitions) do |partition, config|
      fetch_offset = config.fetch(:fetch_offset)
      max_bytes = config.fetch(:max_bytes)

      encoder.write_int32(partition)
      encoder.write_int64(fetch_offset)
      encoder.write_int32(max_bytes)
    end
  end
end

#response_classObject



33
34
35
# File 'lib/kafka/protocol/fetch_request.rb', line 33

def response_class
  Protocol::FetchResponse
end