Class: Rdkafka::Producer::DeliveryHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::Producer::DeliveryHandle
- Defined in:
- lib/rdkafka/producer/delivery_handle.rb
Overview
Handle to wait for a delivery report which is returned when producing a message.
Defined Under Namespace
Classes: WaitTimeoutError
Instance Method Summary collapse
-
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
-
#wait(timeout_in_seconds = 60) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout.
Instance Method Details
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
14 15 16 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 14 def pending? self[:pending] end |
#wait(timeout_in_seconds = 60) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 28 def wait(timeout_in_seconds=60) timeout = if timeout_in_seconds Time.now.to_i + timeout_in_seconds else nil end loop do if pending? if timeout && timeout <= Time.now.to_i raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds") end sleep 0.1 next elsif self[:response] != 0 raise RdkafkaError.new(self[:response]) else return DeliveryReport.new(self[:partition], self[:offset]) end end end |