Class: Kafka::Protocol::FetchRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/fetch_request.rb

Overview

A request to fetch messages from a given partition.

API Specification

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Instance Method Summary collapse

Constructor Details

#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest

Returns a new instance of FetchRequest.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)


22
23
24
25
26
27
28
# File 'lib/kafka/protocol/fetch_request.rb', line 22

def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
  @replica_id = REPLICA_ID
  @max_wait_time = max_wait_time
  @min_bytes = min_bytes
  @max_bytes = max_bytes
  @topics = topics
end

Instance Method Details

#api_keyObject



30
31
32
# File 'lib/kafka/protocol/fetch_request.rb', line 30

def api_key
  FETCH_API
end

#api_versionObject



34
35
36
# File 'lib/kafka/protocol/fetch_request.rb', line 34

def api_version
  3
end

#encode(encoder) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/kafka/protocol/fetch_request.rb', line 42

def encode(encoder)
  encoder.write_int32(@replica_id)
  encoder.write_int32(@max_wait_time)
  encoder.write_int32(@min_bytes)
  encoder.write_int32(@max_bytes)

  encoder.write_array(@topics) do |topic, partitions|
    encoder.write_string(topic)

    encoder.write_array(partitions) do |partition, config|
      fetch_offset = config.fetch(:fetch_offset)
      max_bytes = config.fetch(:max_bytes)

      encoder.write_int32(partition)
      encoder.write_int64(fetch_offset)
      encoder.write_int32(max_bytes)
    end
  end
end

#response_classObject



38
39
40
# File 'lib/kafka/protocol/fetch_request.rb', line 38

def response_class
  Protocol::FetchResponse
end