Class: Kafka::ProduceOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/produce_operation.rb

Overview

A produce operation attempts to send all messages in a buffer to the Kafka cluster. Since topics and partitions are spread among all brokers in a cluster, this usually involves sending requests to several or all of the brokers.

Instrumentation

When executing the operation, an ack_messages.producer.kafka notification will be emitted for each message set that was successfully appended to a topic partition. The following keys will be found in the payload:

  • :topic — the topic that was written to.
  • :partition — the partition that the message set was appended to.
  • :offset — the offset of the first message in the message set.
  • :message_count — the number of messages that were appended.

In addition to these notifications, a send_messages.producer.kafka notification will be emitted after the operation completes, regardless of whether it succeeds. This notification will have the following keys:

  • message_count – the total number of messages that the operation tried to send. Note that not all messages may get delivered.
  • sent_message_count – the number of messages that were successfully sent.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, buffer:, compression_codec:, compression_threshold:, required_acks:, ack_timeout:, logger:) ⇒ ProduceOperation

Returns a new instance of ProduceOperation.



28
29
30
31
32
33
34
35
36
# File 'lib/kafka/produce_operation.rb', line 28

def initialize(cluster:, buffer:, compression_codec:, compression_threshold:, required_acks:, ack_timeout:, logger:)
  @cluster = cluster
  @buffer = buffer
  @required_acks = required_acks
  @ack_timeout = ack_timeout
  @compression_codec = compression_codec
  @compression_threshold = compression_threshold
  @logger = logger
end

Instance Method Details

#executeObject



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka/produce_operation.rb', line 38

def execute
  Instrumentation.instrument("send_messages.producer.kafka") do |notification|
    message_count = @buffer.size

    notification[:message_count] = message_count

    begin
      send_buffered_messages
    ensure
      notification[:sent_message_count] = message_count - @buffer.size
    end
  end
end