Class: Kafka::Producer
- Inherits:
-
Object
- Object
- Kafka::Producer
- Defined in:
- lib/kafka/producer.rb
Overview
Allows sending messages to a Kafka cluster.
Buffering
The producer buffers pending messages until #send_messages is called. Note that there is
a maximum buffer size (default is 1,000 messages) and writing messages after the
buffer has reached this size will result in a BufferOverflow exception. Make sure
to periodically call #send_messages or set max_buffer_size to an appropriate value.
Buffering messages and sending them in batches greatly improves performance, so try to avoid sending messages after every write. The tradeoff between throughput and message delays depends on your use case.
Error Handling and Retries
The design of the error handling is based on having a MessageBuffer hold messages for all topics/partitions. Whenever we want to send messages to the cluster, we group the buffered messages by the broker they need to be sent to and fire off a request to each broker. A request can be a partial success, so we go through the response and inspect the error code for each partition that we wrote to. If the write to a given partition was successful, we clear the corresponding messages from the buffer -- otherwise, we log the error and keep the messages in the buffer.
After this, we check if the buffer is empty. If it is, we're all done. If it's
not, we do another round of requests, this time with just the remaining messages.
We do this for as long as max_retries permits.
Example
This is an example of an application which reads lines from stdin and writes them to Kafka:
require "kafka"
logger = Logger.new($stderr)
brokers = ENV.fetch("KAFKA_BROKERS").split(",")
# Make sure to create this topic in your Kafka cluster or configure the
# cluster to auto-create topics.
topic = "random-messages"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "simple-producer",
logger: logger,
)
producer = kafka.get_producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
# Send messages for every 10 lines.
producer. if index % 10 == 0
end
ensure
# Make sure to send any remaining messages.
producer.
producer.shutdown
end
Instance Method Summary collapse
-
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
-
#initialize(broker_pool:, logger:, ack_timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) ⇒ Producer
constructor
Initializes a new Producer.
-
#produce(value, key: nil, topic:, partition: nil, partition_key: nil) ⇒ nil
Produces a message to the specified topic.
-
#send_messages ⇒ nil
Sends all buffered messages to the Kafka brokers.
-
#shutdown ⇒ nil
Closes all connections to the brokers.
Constructor Details
#initialize(broker_pool:, logger:, ack_timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) ⇒ Producer
Initializes a new Producer.
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/kafka/producer.rb', line 94 def initialize(broker_pool:, logger:, ack_timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) @broker_pool = broker_pool @logger = logger @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @buffer = MessageBuffer.new end |
Instance Method Details
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
210 211 212 |
# File 'lib/kafka/producer.rb', line 210 def buffer_size @buffer.size end |
#produce(value, key: nil, topic:, partition: nil, partition_key: nil) ⇒ nil
Produces a message to the specified topic. Note that messages are buffered in the producer until #send_messages is called.
Partitioning
There are several options for specifying the partition that the message should be written to.
The simplest option is to not specify a message key, partition key, or partition number, in which case the message will be assigned a partition at random.
You can also specify the partition parameter yourself. This requires you to
know which partitions are available, however. Oftentimes the best option is
to specify the partition_key parameter: messages with the same partition
key will always be assigned to the same partition, as long as the number of
partitions doesn't change. You can also omit the partition key and specify
a message key instead. The message key is part of the message payload, and
so can carry semantic value--whether you want to have the message key double
as a partition key is up to you.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/kafka/producer.rb', line 134 def produce(value, key: nil, topic:, partition: nil, partition_key: nil) unless buffer_size < @max_buffer_size raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" end if partition.nil? # If no explicit partition key is specified we use the message key instead. partition_key ||= key partitioner = Partitioner.new(@broker_pool.partitions_for(topic)) partition = partitioner.partition_for_key(partition_key) end = Protocol::Message.new(key: key, value: value) @buffer.write(, topic: topic, partition: partition) partition end |
#send_messages ⇒ nil
Sends all buffered messages to the Kafka brokers.
Depending on the value of required_acks used when initializing the producer,
this call may block until the specified number of replicas have acknowledged
the writes. The ack_timeout setting places an upper bound on the amount of
time the call will block before failing.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/kafka/producer.rb', line 162 def attempt = 0 transmission = Transmission.new( broker_pool: @broker_pool, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, logger: @logger, ) loop do @logger.info "Sending #{@buffer.size} messages" attempt += 1 transmission. if @buffer.empty? @logger.info "Successfully transmitted all messages" break elsif attempt <= @max_retries @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}" @logger.info "Waiting #{@retry_backoff}s before retrying" sleep @retry_backoff else @logger.error "Failed to transmit all messages; keeping remaining messages in buffer" break end end if @required_acks == 0 # No response is returned by the brokers, so we can't know which messages # have been successfully written. Our only option is to assume that they all # have. @buffer.clear end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise FailedToSendMessages, "Failed to send messages to #{partitions}" end end |
#shutdown ⇒ nil
Closes all connections to the brokers.
217 218 219 |
# File 'lib/kafka/producer.rb', line 217 def shutdown @broker_pool.shutdown end |