Class: Rdkafka::Producer::DeliveryHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::Producer::DeliveryHandle
- Defined in:
- lib/rdkafka/producer/delivery_handle.rb
Overview
Handle to wait for a delivery report which is returned when producing a message.
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
{}
Class Method Summary collapse
Instance Method Summary collapse
-
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
-
#wait(max_wait_timeout: 60, wait_timeout: 0.1) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout.
Class Method Details
.register(address, handle) ⇒ Object
17 18 19 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 17 def self.register(address, handle) REGISTRY[address] = handle end |
.remove(address) ⇒ Object
21 22 23 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 21 def self.remove(address) REGISTRY.delete(address) end |
Instance Method Details
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
28 29 30 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 28 def pending? self[:pending] end |
#wait(max_wait_timeout: 60, wait_timeout: 0.1) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 43 def wait(max_wait_timeout: 60, wait_timeout: 0.1) timeout = if max_wait_timeout CURRENT_TIME.call + max_wait_timeout else nil end loop do if pending? if timeout && timeout <= CURRENT_TIME.call raise WaitTimeoutError.new("Waiting for delivery timed out after #{max_wait_timeout} seconds") end sleep wait_timeout elsif self[:response] != 0 raise RdkafkaError.new(self[:response]) else return DeliveryReport.new(self[:partition], self[:offset]) end end end |