Class: Kafka::Protocol::ProduceRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::ProduceRequest
- Defined in:
- lib/kafka/protocol/produce_request.rb
Overview
A produce request sends a message set to the server.
API Specification
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Instance Attribute Summary collapse
-
#messages_for_topics ⇒ Object
readonly
Returns the value of attribute messages_for_topics.
-
#required_acks ⇒ Object
readonly
Returns the value of attribute required_acks.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
Instance Method Summary collapse
- #api_key ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(required_acks:, timeout:, messages_for_topics:) ⇒ ProduceRequest
constructor
A new instance of ProduceRequest.
-
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all.
- #response_class ⇒ Object
Constructor Details
#initialize(required_acks:, timeout:, messages_for_topics:) ⇒ ProduceRequest
Returns a new instance of ProduceRequest.
33 34 35 36 37 |
# File 'lib/kafka/protocol/produce_request.rb', line 33 def initialize(required_acks:, timeout:, messages_for_topics:) @required_acks = required_acks @timeout = timeout @messages_for_topics = end |
Instance Attribute Details
#messages_for_topics ⇒ Object (readonly)
Returns the value of attribute messages_for_topics.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def @messages_for_topics end |
#required_acks ⇒ Object (readonly)
Returns the value of attribute required_acks.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def required_acks @required_acks end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
28 29 30 |
# File 'lib/kafka/protocol/produce_request.rb', line 28 def timeout @timeout end |
Instance Method Details
#api_key ⇒ Object
39 40 41 |
# File 'lib/kafka/protocol/produce_request.rb', line 39 def api_key PRODUCE_API end |
#encode(encoder) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/kafka/protocol/produce_request.rb', line 55 def encode(encoder) encoder.write_int16(@required_acks) encoder.write_int32(@timeout) encoder.write_array(@messages_for_topics) do |topic, | encoder.write_string(topic) encoder.write_array() do |partition, | encoder.write_int32(partition) # When encoding the message set into the request, the bytesize of the message # set must precede the actual data. Therefore we need to encode the entire # message set into a separate buffer first. = Encoder.encode_with() encoder.write_bytes() end end end |
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.
51 52 53 |
# File 'lib/kafka/protocol/produce_request.rb', line 51 def requires_acks? @required_acks != 0 end |
#response_class ⇒ Object
43 44 45 |
# File 'lib/kafka/protocol/produce_request.rb', line 43 def response_class requires_acks? ? Protocol::ProduceResponse : nil end |