Method: Kafka::AsyncProducer#produce
- Defined in:
- lib/kafka/async_producer.rb
#produce(value, topic:, **options) ⇒ nil
Produces a message to the specified topic.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/kafka/async_producer.rb', line 105 def produce(value, topic:, **) # We want to fail fast if `topic` isn't a String topic = topic.to_str ensure_threads_running! if @queue.size >= @max_queue_size buffer_overflow topic, "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" end args = [value, **.merge(topic: topic)] @queue << [:produce, args] @instrumenter.instrument("enqueue_message.async_producer", { topic: topic, queue_size: @queue.size, max_queue_size: @max_queue_size, }) nil end |