Class: Kafka::AsyncProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/async_producer.rb

Overview

A Kafka producer that does all its work in the background so as to not block the calling thread. Calls to #deliver_messages are asynchronous and return immediately.

In addition to this property it's possible to define automatic delivery policies. These allow placing an upper bound on the number of buffered messages and the time between message deliveries.

  • If delivery_threshold is set to a value n higher than zero, the producer will automatically deliver its messages once its buffer size reaches n.
  • If delivery_interval is set to a value n higher than zero, the producer will automatically deliver its messages every n seconds.

By default, automatic delivery is disabled and you'll have to call #deliver_messages manually.

Buffer Overflow and Backpressure

The calling thread communicates with the background thread doing the actual work using a thread safe queue. While the background thread is busy delivering messages, new messages will be buffered in the queue. In order to avoid the queue growing uncontrollably in cases where the background thread gets stuck or can't follow the pace of the calling thread, there's a maximum number of messages that is allowed to be buffered. You can configure this value by setting max_queue_size.

If you produce messages faster than the background producer thread can deliver them to Kafka you will eventually fill the producer's buffer. Once this happens, the background thread will stop popping messages off the queue until it can successfully deliver the buffered messages. The queue will therefore grow in size, potentially hitting the max_queue_size limit. Once this happens, calls to #produce will raise a BufferOverflow error.

Depending on your use case you may want to slow down the rate of messages being produced or perhaps halt your application completely until the producer can deliver the buffered messages and clear the message queue.

Example

producer = kafka.async_producer(
  # Keep at most 1.000 messages in the buffer before delivering:
  delivery_threshold: 1000,

  # Deliver messages every 30 seconds:
  delivery_interval: 30,
)

# There's no need to manually call #deliver_messages, it will happen
# automatically in the background.
producer.produce("hello", topic: "greetings")

# Remember to shut down the producer when you're done with it.
producer.shutdown

Defined Under Namespace

Classes: Timer, Worker

Instance Method Summary collapse

Constructor Details

#initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ AsyncProducer

Initializes a new AsyncProducer.

Parameters:

  • sync_producer (Kafka::Producer)

    the synchronous producer that should be used in the background.

  • max_queue_size (Integer) (defaults to: 1000)

    the maximum number of messages allowed in the queue.

  • delivery_threshold (Integer) (defaults to: 0)

    if greater than zero, the number of buffered messages that will automatically trigger a delivery.

  • delivery_interval (Integer) (defaults to: 0)

    if greater than zero, the number of seconds between automatic message deliveries.

Raises:

  • (ArgumentError)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kafka/async_producer.rb', line 73

def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  raise ArgumentError unless max_queue_size > 0
  raise ArgumentError unless delivery_threshold >= 0
  raise ArgumentError unless delivery_interval >= 0

  @queue = Queue.new
  @max_queue_size = max_queue_size
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)

  @worker = Worker.new(
    queue: @queue,
    producer: sync_producer,
    delivery_threshold: delivery_threshold,
    max_retries: max_retries,
    retry_backoff: retry_backoff,
    instrumenter: instrumenter,
    logger: logger
  )

  # The timer will no-op if the delivery interval is zero.
  @timer = Timer.new(queue: @queue, interval: delivery_interval)

  @thread_mutex = Mutex.new
end

Instance Method Details

#deliver_messagesnil

Asynchronously delivers the buffered messages. This method will return immediately and the actual work will be done in the background.

Returns:

  • (nil)

See Also:



133
134
135
136
137
138
139
# File 'lib/kafka/async_producer.rb', line 133

def deliver_messages
  ensure_threads_running!

  @queue << [:deliver_messages, nil]

  nil
end

#produce(value, topic:, **options) ⇒ nil

Produces a message to the specified topic.

Parameters:

  • value (String)

    the message data.

  • key (String)

    the message key.

  • headers (Hash<String, String>)

    the headers for the message.

  • topic (String)

    the topic that the message should be written to.

  • partition (Integer)

    the partition that the message should be written to.

  • partition_key (String)

    the key that should be used to assign a partition.

  • create_time (Time)

    the timestamp that should be set on the message.

Returns:

  • (nil)

Raises:

See Also:



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kafka/async_producer.rb', line 105

def produce(value, topic:, **options)
  # We want to fail fast if `topic` isn't a String
  topic = topic.to_str

  ensure_threads_running!

  if @queue.size >= @max_queue_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
  end

  args = [value, **options.merge(topic: topic)]
  @queue << [:produce, args]

  @instrumenter.instrument("enqueue_message.async_producer", {
    topic: topic,
    queue_size: @queue.size,
    max_queue_size: @max_queue_size,
  })

  nil
end

#shutdownnil

Shuts down the producer, releasing the network resources used. This method will block until the buffered messages have been delivered.

Returns:

  • (nil)

See Also:



146
147
148
149
150
151
152
153
154
# File 'lib/kafka/async_producer.rb', line 146

def shutdown
  ensure_threads_running!

  @timer_thread && @timer_thread.exit
  @queue << [:shutdown, nil]
  @worker_thread && @worker_thread.join

  nil
end