Method: Kafka::AsyncProducer#initialize
- Defined in:
- lib/kafka/async_producer.rb
#initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ AsyncProducer
Initializes a new AsyncProducer.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/kafka/async_producer.rb', line 73 def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @queue = Queue.new @max_queue_size = max_queue_size @instrumenter = instrumenter @logger = TaggedLogger.new(logger) @worker = Worker.new( queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, max_retries: max_retries, retry_backoff: retry_backoff, instrumenter: instrumenter, logger: logger ) # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) @thread_mutex = Mutex.new end |