Class: WaterDrop::AsyncProducer

Inherits:
BaseProducer show all
Defined in:
lib/water_drop/async_producer.rb

Overview

Async producer for messages

Class Method Summary collapse

Class Method Details

.call(message, options) ⇒ Object

Performs message delivery using deliver_async method

Parameters:

  • message (String)

    message that we want to send to Kafka

  • options (Hash)

    options (including topic) for producer

Raises:



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/water_drop/async_producer.rb', line 12

def self.call(message, options)
  attempts_count ||= 0
  attempts_count += 1

  validate!(options)
  return unless WaterDrop.config.deliver

  d_method = WaterDrop.config.raise_on_buffer_overflow ? :deliver_async! : :deliver_async

  DeliveryBoy.send(d_method, message, options)
rescue Kafka::Error => error
  graceful_attempt?(attempts_count, message, options, error) ? retry : raise(error)
end