Method: Kafka::AsyncProducer#produce

Defined in:
lib/kafka/async_producer.rb

#produce(value, topic:, **options) ⇒ nil

Produces a message to the specified topic.

Parameters:

  • value (String)

    the message data.

  • key (String)

    the message key.

  • headers (Hash<String, String>)

    the headers for the message.

  • topic (String)

    the topic that the message should be written to.

  • partition (Integer)

    the partition that the message should be written to.

  • partition_key (String)

    the key that should be used to assign a partition.

  • create_time (Time)

    the timestamp that should be set on the message.

Returns:

  • (nil)

Raises:

See Also:



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kafka/async_producer.rb', line 105

def produce(value, topic:, **options)
  # We want to fail fast if `topic` isn't a String
  topic = topic.to_str

  ensure_threads_running!

  if @queue.size >= @max_queue_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
  end

  args = [value, **options.merge(topic: topic)]
  @queue << [:produce, args]

  @instrumenter.instrument("enqueue_message.async_producer", {
    topic: topic,
    queue_size: @queue.size,
    max_queue_size: @max_queue_size,
  })

  nil
end