Class: Rdkafka::Producer::DeliveryHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::Producer::DeliveryHandle
- Defined in:
- lib/rdkafka/producer/delivery_handle.rb
Overview
Handle to wait for a delivery report which is returned when producing a message.
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
{}
Class Method Summary collapse
Instance Method Summary collapse
-
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
-
#wait(timeout_in_seconds = 60) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout.
Class Method Details
.register(address, handle) ⇒ Object
13 14 15 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 13 def self.register(address, handle) REGISTRY[address] = handle end |
.remove(address) ⇒ Object
17 18 19 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 17 def self.remove(address) REGISTRY.delete(address) end |
Instance Method Details
#pending? ⇒ Boolean
Whether the delivery handle is still pending.
24 25 26 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 24 def pending? self[:pending] end |
#wait(timeout_in_seconds = 60) ⇒ DeliveryReport
Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/rdkafka/producer/delivery_handle.rb', line 38 def wait(timeout_in_seconds=60) timeout = if timeout_in_seconds Time.now.to_i + timeout_in_seconds else nil end loop do if pending? if timeout && timeout <= Time.now.to_i raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds") end sleep 0.1 next elsif self[:response] != 0 raise RdkafkaError.new(self[:response]) else return DeliveryReport.new(self[:partition], self[:offset]) end end end |