Class: Kafka::Protocol::ProduceRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::ProduceRequest
- Defined in:
- lib/kafka/protocol/produce_request.rb
Overview
A produce request sends a message set to the server.
API Specification
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Instance Attribute Summary collapse
-
#messages_for_topics ⇒ Object
readonly
Returns the value of attribute messages_for_topics.
-
#required_acks ⇒ Object
readonly
Returns the value of attribute required_acks.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
Instance Method Summary collapse
- #encode(encoder) ⇒ Object
-
#initialize(required_acks:, timeout:, messages_for_topics:) ⇒ ProduceRequest
constructor
A new instance of ProduceRequest.
-
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all.
Constructor Details
#initialize(required_acks:, timeout:, messages_for_topics:) ⇒ ProduceRequest
Returns a new instance of ProduceRequest.
30 31 32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 30 def initialize(required_acks:, timeout:, messages_for_topics:) @required_acks = required_acks @timeout = timeout @messages_for_topics = end |
Instance Attribute Details
#messages_for_topics ⇒ Object (readonly)
Returns the value of attribute messages_for_topics.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def @messages_for_topics end |
#required_acks ⇒ Object (readonly)
Returns the value of attribute required_acks.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def required_acks @required_acks end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def timeout @timeout end |
Instance Method Details
#encode(encoder) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kafka/protocol/produce_request.rb', line 44 def encode(encoder) encoder.write_int16(@required_acks) encoder.write_int32(@timeout) encoder.write_array(@messages_for_topics) do |topic, | encoder.write_string(topic) encoder.write_array() do |partition, | # When encoding the message set into the request, the bytesize of the message # set must precede the actual bytes. Therefore we need to encode the entire # message set into a separate buffer first. = () encoder.write_int32(partition) # When encoding bytes, the 32 bit size of the byte buffer is encoded first. encoder.write_bytes() end end end |
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.
40 41 42 |
# File 'lib/kafka/protocol/produce_request.rb', line 40 def requires_acks? @required_acks != 0 end |