Class: Kafka::Protocol::FetchRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/fetch_request.rb

Overview

A request to fetch messages from a given partition.

API Specification

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Constant Summary collapse

ISOLATION_READ_UNCOMMITTED =
0
ISOLATION_READ_COMMITTED =
1

Instance Method Summary collapse

Constructor Details

#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest

Returns a new instance of FetchRequest.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)


26
27
28
29
30
31
32
# File 'lib/kafka/protocol/fetch_request.rb', line 26

def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
  @replica_id = REPLICA_ID
  @max_wait_time = max_wait_time
  @min_bytes = min_bytes
  @max_bytes = max_bytes
  @topics = topics
end

Instance Method Details

#api_keyObject



34
35
36
# File 'lib/kafka/protocol/fetch_request.rb', line 34

def api_key
  FETCH_API
end

#api_versionObject



38
39
40
# File 'lib/kafka/protocol/fetch_request.rb', line 38

def api_version
  4
end

#encode(encoder) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/kafka/protocol/fetch_request.rb', line 46

def encode(encoder)
  encoder.write_int32(@replica_id)
  encoder.write_int32(@max_wait_time)
  encoder.write_int32(@min_bytes)
  encoder.write_int32(@max_bytes)
  encoder.write_int8(ISOLATION_READ_COMMITTED)

  encoder.write_array(@topics) do |topic, partitions|
    encoder.write_string(topic)

    encoder.write_array(partitions) do |partition, config|
      fetch_offset = config.fetch(:fetch_offset)
      max_bytes = config.fetch(:max_bytes)

      encoder.write_int32(partition)
      encoder.write_int64(fetch_offset)
      encoder.write_int32(max_bytes)
    end
  end
end

#response_classObject



42
43
44
# File 'lib/kafka/protocol/fetch_request.rb', line 42

def response_class
  Protocol::FetchResponse
end