Class: Kafka::ProduceOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/produce_operation.rb

Overview

A produce operation attempts to send all messages in a buffer to the Kafka cluster. Since topics and partitions are spread among all brokers in a cluster, this usually involves sending requests to several or all of the brokers.

Instrumentation

When executing the operation, an ack_message.producer.kafka notification will be emitted for each message that was successfully appended to a topic partition. The following keys will be found in the payload:

  • :topic — the topic that was written to.
  • :partition — the partition that the message set was appended to.
  • :offset — the offset of the message in the partition.
  • :key — the message key.
  • :value — the message value.
  • :delay — the time between the message was produced and when it was acknowledged.

In addition to these notifications, a send_messages.producer.kafka notification will be emitted after the operation completes, regardless of whether it succeeds. This notification will have the following keys:

  • :message_count – the total number of messages that the operation tried to send. Note that not all messages may get delivered.
  • :sent_message_count – the number of messages that were successfully sent.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:) ⇒ ProduceOperation

Returns a new instance of ProduceOperation.



30
31
32
33
34
35
36
37
38
# File 'lib/kafka/produce_operation.rb', line 30

def initialize(cluster:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:)
  @cluster = cluster
  @buffer = buffer
  @required_acks = required_acks
  @ack_timeout = ack_timeout
  @compressor = compressor
  @logger = logger
  @instrumenter = instrumenter
end

Instance Method Details

#executeObject



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/produce_operation.rb', line 40

def execute
  @instrumenter.instrument("send_messages.producer") do |notification|
    message_count = @buffer.size

    notification[:message_count] = message_count

    begin
      send_buffered_messages
    ensure
      notification[:sent_message_count] = message_count - @buffer.size
    end
  end
end