Module: WaterDrop::Producer::Sync
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/water_drop/producer/sync.rb
Overview
Component for synchronous producer operations
Instance Method Summary collapse
-
#produce_many_sync(messages) ⇒ Array<Rdkafka::Producer::DeliveryReport>
Produces many messages to Kafka and waits for them to be delivered.
-
#produce_sync(message) ⇒ Rdkafka::Producer::DeliveryReport
Produces a message to Kafka and waits for it to be delivered.
Instance Method Details
#produce_many_sync(messages) ⇒ Array<Rdkafka::Producer::DeliveryReport>
Produces many messages to Kafka and waits for them to be delivered
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/water_drop/producer/sync.rb', line 48 def produce_many_sync() ensure_active! .each { || () } @monitor.instrument('messages.produced_sync', producer: self, messages: ) do .map { || client.produce(**) } .map! do |handler| handler.wait( max_wait_timeout: @config.max_wait_timeout, wait_timeout: @config.wait_timeout ) end end end |
#produce_sync(message) ⇒ Rdkafka::Producer::DeliveryReport
Produces a message to Kafka and waits for it to be delivered
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/water_drop/producer/sync.rb', line 18 def produce_sync() ensure_active! () @monitor.instrument( 'message.produced_sync', producer: self, message: ) do client .produce(**) .wait( max_wait_timeout: @config.max_wait_timeout, wait_timeout: @config.wait_timeout ) end end |