Method: Kafka::Producer#deliver_messages

Defined in:
lib/kafka/producer.rb

#deliver_messagesnil

Sends all buffered messages to the Kafka brokers.

Depending on the value of required_acks used when initializing the producer, this call may block until the specified number of replicas have acknowledged the writes. The ack_timeout setting places an upper bound on the amount of time the call will block before failing.

Returns:

  • (nil)

Raises:

  • (DeliveryFailed)

    if not all messages could be successfully sent.



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/kafka/producer.rb', line 250

def deliver_messages
  # There's no need to do anything if the buffer is empty.
  return if buffer_size == 0

  @instrumenter.instrument("deliver_messages.producer") do |notification|
    message_count = buffer_size

    notification[:message_count] = message_count
    notification[:attempts] = 0

    begin
      deliver_messages_with_retries(notification)
    ensure
      notification[:delivered_message_count] = message_count - buffer_size
    end
  end
end