Method: Kafka::AsyncProducer#initialize

Defined in:
lib/kafka/async_producer.rb

#initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ AsyncProducer

Initializes a new AsyncProducer.

Parameters:

  • sync_producer (Kafka::Producer)

    the synchronous producer that should be used in the background.

  • max_queue_size (Integer) (defaults to: 1000)

    the maximum number of messages allowed in the queue.

  • delivery_threshold (Integer) (defaults to: 0)

    if greater than zero, the number of buffered messages that will automatically trigger a delivery.

  • delivery_interval (Integer) (defaults to: 0)

    if greater than zero, the number of seconds between automatic message deliveries.

Raises:

  • (ArgumentError)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kafka/async_producer.rb', line 73

def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  raise ArgumentError unless max_queue_size > 0
  raise ArgumentError unless delivery_threshold >= 0
  raise ArgumentError unless delivery_interval >= 0

  @queue = Queue.new
  @max_queue_size = max_queue_size
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)

  @worker = Worker.new(
    queue: @queue,
    producer: sync_producer,
    delivery_threshold: delivery_threshold,
    max_retries: max_retries,
    retry_backoff: retry_backoff,
    instrumenter: instrumenter,
    logger: logger
  )

  # The timer will no-op if the delivery interval is zero.
  @timer = Timer.new(queue: @queue, interval: delivery_interval)

  @thread_mutex = Mutex.new
end