Class: Kafka::ProduceOperation
- Inherits:
-
Object
- Object
- Kafka::ProduceOperation
- Defined in:
- lib/kafka/produce_operation.rb
Overview
A produce operation attempts to send all messages in a buffer to the Kafka cluster. Since topics and partitions are spread among all brokers in a cluster, this usually involves sending requests to several or all of the brokers.
Instrumentation
When executing the operation, an ack_messages.producer.kafka
notification will be
emitted for each message set that was successfully appended to a topic partition.
The following keys will be found in the payload:
:topic
— the topic that was written to.:partition
— the partition that the message set was appended to.:offset
— the offset of the first message in the message set.:message_count
— the number of messages that were appended.
In addition to these notifications, a send_messages.producer.kafka
notification will
be emitted after the operation completes, regardless of whether it succeeds. This
notification will have the following keys:
message_count
– the total number of messages that the operation tried to send. Note that not all messages may get delivered.sent_message_count
– the number of messages that were successfully sent.
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(cluster:, buffer:, compression_codec:, compression_threshold:, required_acks:, ack_timeout:, logger:) ⇒ ProduceOperation
constructor
A new instance of ProduceOperation.
Constructor Details
#initialize(cluster:, buffer:, compression_codec:, compression_threshold:, required_acks:, ack_timeout:, logger:) ⇒ ProduceOperation
Returns a new instance of ProduceOperation.
28 29 30 31 32 33 34 35 36 |
# File 'lib/kafka/produce_operation.rb', line 28 def initialize(cluster:, buffer:, compression_codec:, compression_threshold:, required_acks:, ack_timeout:, logger:) @cluster = cluster @buffer = buffer @required_acks = required_acks @ack_timeout = ack_timeout @compression_codec = compression_codec @compression_threshold = compression_threshold @logger = logger end |
Instance Method Details
#execute ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/kafka/produce_operation.rb', line 38 def execute Instrumentation.instrument("send_messages.producer.kafka") do |notification| = @buffer.size notification[:message_count] = begin ensure notification[:sent_message_count] = - @buffer.size end end end |