Module: WaterDrop::Producer::Async

Included in:
WaterDrop::Producer
Defined in:
lib/water_drop/producer/async.rb

Overview

Component for asynchronous producer operations

Instance Method Summary collapse

Instance Method Details

#produce_async(message) ⇒ Rdkafka::Producer::DeliveryHandle

Produces a message to Kafka and does not wait for results

Parameters:

Returns:

  • (Rdkafka::Producer::DeliveryHandle)

    delivery handle that might return the report

Raises:

  • (Rdkafka::RdkafkaError)

    When adding the message to rdkafka’s queue failed

  • (Errors::MessageInvalidError)

    When provided message details are invalid and the message could not be sent to Kafka



16
17
18
19
20
21
22
23
24
25
# File 'lib/water_drop/producer/async.rb', line 16

def produce_async(message)
  ensure_active!
  validate_message!(message)

  @monitor.instrument(
    'message.produced_async',
    producer: self,
    message: message
  ) { client.produce(**message) }
end

#produce_many_async(messages) ⇒ Array<Rdkafka::Producer::DeliveryHandle>

Produces many messages to Kafka and does not wait for them to be delivered

Parameters:

  • messages (Array<Hash>)

    array with messages that comply with the Contracts::Message contract

Returns:

  • (Array<Rdkafka::Producer::DeliveryHandle>)

    deliveries handles

Raises:

  • (Rdkafka::RdkafkaError)

    When adding the messages to rdkafka’s queue failed

  • (Errors::MessageInvalidError)

    When any of the provided messages details are invalid and the message could not be sent to Kafka



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/water_drop/producer/async.rb', line 37

def produce_many_async(messages)
  ensure_active!
  messages.each { |message| validate_message!(message) }

  @monitor.instrument(
    'messages.produced_async',
    producer: self,
    messages: messages
  ) do
    messages.map { |message| client.produce(**message) }
  end
end