Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/producer.rb

Overview

Allows sending messages to a Kafka cluster.

Typically you won't instantiate this class yourself, but rather have Client do it for you, e.g.

# Will instantiate Kafka::Client
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])

# Will instantiate Kafka::Producer
producer = kafka.producer

This is done in order to share a logger as well as a pool of broker connections across different producers. This also means that you don't need to pass the cluster and logger options to #producer. See #initialize for the list of other options you can pass in.

Buffering

The producer buffers pending messages until #deliver_messages is called. Note that there is a maximum buffer size (default is 1,000 messages) and writing messages after the buffer has reached this size will result in a BufferOverflow exception. Make sure to periodically call #deliver_messages or set max_buffer_size to an appropriate value.

Buffering messages and sending them in batches greatly improves performance, so try to avoid sending messages after every write. The tradeoff between throughput and message delays depends on your use case.

Error Handling and Retries

The design of the error handling is based on having a MessageBuffer hold messages for all topics/partitions. Whenever we want to send messages to the cluster, we group the buffered messages by the broker they need to be sent to and fire off a request to each broker. A request can be a partial success, so we go through the response and inspect the error code for each partition that we wrote to. If the write to a given partition was successful, we clear the corresponding messages from the buffer -- otherwise, we log the error and keep the messages in the buffer.

After this, we check if the buffer is empty. If it is, we're all done. If it's not, we do another round of requests, this time with just the remaining messages. We do this for as long as max_retries permits.

Compression

Depending on what kind of data you produce, enabling compression may yield improved bandwidth and space usage. Compression in Kafka is done on entire messages sets rather than on individual messages. This improves the compression rate and generally means that compressions works better the larger your buffers get, since the message sets will be larger by the time they're compressed.

Since many workloads have variations in throughput and distribution across partitions, it's possible to configure a threshold for when to enable compression by setting compression_threshold. Only if the defined number of messages are buffered for a partition will the messages be compressed.

Compression is enabled by passing the compression_codec parameter with the name of one of the algorithms allowed by Kafka:

  • :snappy for Snappy compression.
  • :gzip for gzip compression.
  • :lz4 for LZ4 compression.
  • :zstd for zstd compression.

By default, all message sets will be compressed if you specify a compression codec. To increase the compression threshold, set compression_threshold to an integer value higher than one.

Instrumentation

Whenever #produce is called, the notification produce_message.producer.kafka will be emitted with the following payload:

  • value – the message value.
  • key – the message key.
  • topic – the topic that was produced to.
  • buffer_size – the buffer size after adding the message.
  • max_buffer_size – the maximum allowed buffer size for the producer.

After #deliver_messages completes, the notification deliver_messages.producer.kafka will be emitted with the following payload:

  • message_count – the total number of messages that the producer tried to deliver. Note that not all messages may get delivered.
  • delivered_message_count – the number of messages that were successfully delivered.
  • attempts – the number of attempts made to deliver the messages.

Example

This is an example of an application which reads lines from stdin and writes them to Kafka:

require "kafka"

logger = Logger.new($stderr)
brokers = ENV.fetch("KAFKA_BROKERS").split(",")

# Make sure to create this topic in your Kafka cluster or configure the
# cluster to auto-create topics.
topic = "random-messages"

kafka = Kafka.new(brokers, client_id: "simple-producer", logger: logger)
producer = kafka.producer

begin
  $stdin.each_with_index do |line, index|
    producer.produce(line, topic: topic)

    # Send messages for every 10 lines.
    producer.deliver_messages if index % 10 == 0
  end
ensure
  # Make sure to send any remaining messages.
  producer.deliver_messages

  producer.shutdown
end

Defined Under Namespace

Classes: AbortTransaction

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:, interceptors: []) ⇒ Producer

Returns a new instance of Producer.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/kafka/producer.rb', line 133

def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
               required_acks:, max_retries:, retry_backoff:, max_buffer_size:,
               max_buffer_bytesize:, partitioner:, interceptors: [])
  @cluster = cluster
  @transaction_manager = transaction_manager
  @logger = TaggedLogger.new(logger)
  @instrumenter = instrumenter
  @required_acks = required_acks == :all ? -1 : required_acks
  @ack_timeout = ack_timeout
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @max_buffer_size = max_buffer_size
  @max_buffer_bytesize = max_buffer_bytesize
  @compressor = compressor
  @partitioner = partitioner
  @interceptors = Interceptors.new(interceptors: interceptors, logger: logger)

  # The set of topics that are produced to.
  @target_topics = Set.new

  # A buffer organized by topic/partition.
  @buffer = MessageBuffer.new

  # Messages added by `#produce` but not yet assigned a partition.
  @pending_message_queue = PendingMessageQueue.new
end

Instance Method Details

#abort_transactionnil

This method abort the pending transaction, marks all the produced records aborted. All the records will be wiped out by the brokers and the cosumers don't have a chance to consume those messages, except they enable consuming uncommitted option.

This method can only be called if and only if the current transaction is at IN_TRANSACTION state.

Returns:

  • (nil)


337
338
339
# File 'lib/kafka/producer.rb', line 337

def abort_transaction
  @transaction_manager.abort_transaction
end

#begin_transactionnil

Mark the beginning of a transaction. This method transitions the state of the transaction trantiions to IN_TRANSACTION.

All producing operations can only be executed while the transation is in this state. The records are persisted by Kafka brokers, but not visible the consumers until the #commit_transaction method is trigger. After a timeout period without committed, the transaction is timeout and considered as aborted.

Returns:

  • (nil)


313
314
315
# File 'lib/kafka/producer.rb', line 313

def begin_transaction
  @transaction_manager.begin_transaction
end

#buffer_bytesizeObject



275
276
277
# File 'lib/kafka/producer.rb', line 275

def buffer_bytesize
  @pending_message_queue.bytesize + @buffer.bytesize
end

#buffer_sizeInteger

Returns the number of messages currently held in the buffer.

Returns:

  • (Integer)

    buffer size.



271
272
273
# File 'lib/kafka/producer.rb', line 271

def buffer_size
  @pending_message_queue.size + @buffer.size
end

#clear_buffernil

Deletes all buffered messages.

Returns:

  • (nil)


282
283
284
285
# File 'lib/kafka/producer.rb', line 282

def clear_buffer
  @buffer.clear
  @pending_message_queue.clear
end

#commit_transactionnil

This method commits the pending transaction, marks all the produced records committed. After that, they are visible to the consumers.

This method can only be called if and only if the current transaction is at IN_TRANSACTION state.

Returns:

  • (nil)


324
325
326
# File 'lib/kafka/producer.rb', line 324

def commit_transaction
  @transaction_manager.commit_transaction
end

#deliver_messagesnil

Sends all buffered messages to the Kafka brokers.

Depending on the value of required_acks used when initializing the producer, this call may block until the specified number of replicas have acknowledged the writes. The ack_timeout setting places an upper bound on the amount of time the call will block before failing.

Returns:

  • (nil)

Raises:

  • (DeliveryFailed)

    if not all messages could be successfully sent.



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/kafka/producer.rb', line 250

def deliver_messages
  # There's no need to do anything if the buffer is empty.
  return if buffer_size == 0

  @instrumenter.instrument("deliver_messages.producer") do |notification|
    message_count = buffer_size

    notification[:message_count] = message_count
    notification[:attempts] = 0

    begin
      deliver_messages_with_retries(notification)
    ensure
      notification[:delivered_message_count] = message_count - buffer_size
    end
  end
end

#init_transactionsnil

Initializes the producer to ready for future transactions. This method should be triggered once, before any tranactions are created.

Returns:

  • (nil)


299
300
301
# File 'lib/kafka/producer.rb', line 299

def init_transactions
  @transaction_manager.init_transactions
end

#produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) ⇒ nil

Produces a message to the specified topic. Note that messages are buffered in the producer until #deliver_messages is called.

Partitioning

There are several options for specifying the partition that the message should be written to.

The simplest option is to not specify a message key, partition key, or partition number, in which case the message will be assigned a partition at random.

You can also specify the partition parameter yourself. This requires you to know which partitions are available, however. Oftentimes the best option is to specify the partition_key parameter: messages with the same partition key will always be assigned to the same partition, as long as the number of partitions doesn't change. You can also omit the partition key and specify a message key instead. The message key is part of the message payload, and so can carry semantic value--whether you want to have the message key double as a partition key is up to you.

Parameters:

  • value (String)

    the message data.

  • key (String) (defaults to: nil)

    the message key.

  • headers (Hash<String, String>) (defaults to: {})

    the headers for the message.

  • topic (String)

    the topic that the message should be written to.

  • partition (Integer) (defaults to: nil)

    the partition that the message should be written to.

  • partition_key (String) (defaults to: nil)

    the key that should be used to assign a partition.

  • create_time (Time) (defaults to: Time.now)

    the timestamp that should be set on the message.

Returns:

  • (nil)

Raises:



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/kafka/producer.rb', line 195

def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
  # We want to fail fast if `topic` isn't a String
  topic = topic.to_str

  message = @interceptors.call(PendingMessage.new(
    value: value && value.to_s,
    key: key && key.to_s,
    headers: headers,
    topic: topic,
    partition: partition && Integer(partition),
    partition_key: partition_key && partition_key.to_s,
    create_time: create_time
  ))

  if buffer_size >= @max_buffer_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
  end

  if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached"
  end

  # If the producer is in transactional mode, all the message production
  # must be used when the producer is currently in transaction
  if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
    raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages"
  end

  @target_topics.add(topic)
  @pending_message_queue.write(message)

  @instrumenter.instrument("produce_message.producer", {
    value: value,
    key: key,
    topic: topic,
    create_time: create_time,
    message_size: message.bytesize,
    buffer_size: buffer_size,
    max_buffer_size: @max_buffer_size,
  })

  nil
end

#send_offsets_to_transaction(batch:, group_id:) ⇒ nil

Sends batch last offset to the consumer group coordinator, and also marks this offset as part of the current transaction. This offset will be considered committed only if the transaction is committed successfully.

This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified group_id should be the same as config parameter group_id of the used consumer.

Returns:

  • (nil)


351
352
353
# File 'lib/kafka/producer.rb', line 351

def send_offsets_to_transaction(batch:, group_id:)
  @transaction_manager.send_offsets_to_txn(offsets: { batch.topic => { batch.partition => { offset: batch.last_offset + 1, leader_epoch: batch.leader_epoch } } }, group_id: group_id)
end

#shutdownnil

Closes all connections to the brokers.

Returns:

  • (nil)


290
291
292
293
# File 'lib/kafka/producer.rb', line 290

def shutdown
  @transaction_manager.close
  @cluster.disconnect
end

#to_sObject



160
161
162
# File 'lib/kafka/producer.rb', line 160

def to_s
  "Producer #{@target_topics.to_a.join(', ')}"
end

#transactionnil

Syntactic sugar to enable easier transaction usage. Do the following steps

  • Start the transaction (with Producer#begin_transaction)
  • Yield the given block
  • Commit the transaction (with Producer#commit_transaction)

If the block raises exception, the transaction is automatically aborted before bubble up the exception.

If the block raises Kafka::Producer::AbortTransaction indicator exception, it aborts the transaction silently, without throwing up that exception.

Returns:

  • (nil)


368
369
370
371
372
373
374
375
376
377
378
# File 'lib/kafka/producer.rb', line 368

def transaction
  raise 'This method requires a block' unless block_given?
  begin_transaction
  yield
  commit_transaction
rescue Kafka::Producer::AbortTransaction
  abort_transaction
rescue
  abort_transaction
  raise
end