Class: Kafka::Protocol::FetchRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::FetchRequest
- Defined in:
- lib/kafka/protocol/fetch_request.rb
Overview
A request to fetch messages from a given partition.
API Specification
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
Constant Summary collapse
- ISOLATION_READ_UNCOMMITTED =
0
- ISOLATION_READ_COMMITTED =
1
Instance Method Summary collapse
- #api_key ⇒ Object
- #api_version ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest
constructor
A new instance of FetchRequest.
- #response_class ⇒ Object
Constructor Details
#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest
Returns a new instance of FetchRequest.
26 27 28 29 30 31 32 |
# File 'lib/kafka/protocol/fetch_request.rb', line 26 def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) @replica_id = REPLICA_ID @max_wait_time = max_wait_time @min_bytes = min_bytes @max_bytes = max_bytes @topics = topics end |
Instance Method Details
#api_key ⇒ Object
34 35 36 |
# File 'lib/kafka/protocol/fetch_request.rb', line 34 def api_key FETCH_API end |
#api_version ⇒ Object
38 39 40 |
# File 'lib/kafka/protocol/fetch_request.rb', line 38 def api_version 4 end |
#encode(encoder) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/kafka/protocol/fetch_request.rb', line 46 def encode(encoder) encoder.write_int32(@replica_id) encoder.write_int32(@max_wait_time) encoder.write_int32(@min_bytes) encoder.write_int32(@max_bytes) encoder.write_int8(ISOLATION_READ_COMMITTED) encoder.write_array(@topics) do |topic, partitions| encoder.write_string(topic) encoder.write_array(partitions) do |partition, config| fetch_offset = config.fetch(:fetch_offset) max_bytes = config.fetch(:max_bytes) encoder.write_int32(partition) encoder.write_int64(fetch_offset) encoder.write_int32(max_bytes) end end end |
#response_class ⇒ Object
42 43 44 |
# File 'lib/kafka/protocol/fetch_request.rb', line 42 def response_class Protocol::FetchResponse end |