Method: Kafka::Producer#produce

Defined in:
lib/kafka/producer.rb

#produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) ⇒ nil

Produces a message to the specified topic. Note that messages are buffered in the producer until #deliver_messages is called.

Partitioning

There are several options for specifying the partition that the message should be written to.

The simplest option is to not specify a message key, partition key, or partition number, in which case the message will be assigned a partition at random.

You can also specify the partition parameter yourself. This requires you to know which partitions are available, however. Oftentimes the best option is to specify the partition_key parameter: messages with the same partition key will always be assigned to the same partition, as long as the number of partitions doesn't change. You can also omit the partition key and specify a message key instead. The message key is part of the message payload, and so can carry semantic value--whether you want to have the message key double as a partition key is up to you.

Parameters:

  • value (String)

    the message data.

  • key (String) (defaults to: nil)

    the message key.

  • headers (Hash<String, String>) (defaults to: {})

    the headers for the message.

  • topic (String)

    the topic that the message should be written to.

  • partition (Integer) (defaults to: nil)

    the partition that the message should be written to.

  • partition_key (String) (defaults to: nil)

    the key that should be used to assign a partition.

  • create_time (Time) (defaults to: Time.now)

    the timestamp that should be set on the message.

Returns:

  • (nil)

Raises:



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/kafka/producer.rb', line 195

def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
  # We want to fail fast if `topic` isn't a String
  topic = topic.to_str

  message = @interceptors.call(PendingMessage.new(
    value: value && value.to_s,
    key: key && key.to_s,
    headers: headers,
    topic: topic,
    partition: partition && Integer(partition),
    partition_key: partition_key && partition_key.to_s,
    create_time: create_time
  ))

  if buffer_size >= @max_buffer_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
  end

  if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached"
  end

  # If the producer is in transactional mode, all the message production
  # must be used when the producer is currently in transaction
  if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
    raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages"
  end

  @target_topics.add(topic)
  @pending_message_queue.write(message)

  @instrumenter.instrument("produce_message.producer", {
    value: value,
    key: key,
    topic: topic,
    create_time: create_time,
    message_size: message.bytesize,
    buffer_size: buffer_size,
    max_buffer_size: @max_buffer_size,
  })

  nil
end