Class: Rdkafka::Producer::DeliveryHandle

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/rdkafka/producer/delivery_handle.rb

Overview

Handle to wait for a delivery report which is returned when producing a message.

Defined Under Namespace

Classes: WaitTimeoutError

Constant Summary collapse

REGISTRY =
{}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.register(address, handle) ⇒ Object


13
14
15
# File 'lib/rdkafka/producer/delivery_handle.rb', line 13

def self.register(address, handle)
  REGISTRY[address] = handle
end

.remove(address) ⇒ Object


17
18
19
# File 'lib/rdkafka/producer/delivery_handle.rb', line 17

def self.remove(address)
  REGISTRY.delete(address)
end

Instance Method Details

#pending?Boolean

Whether the delivery handle is still pending.


24
25
26
# File 'lib/rdkafka/producer/delivery_handle.rb', line 24

def pending?
  self[:pending]
end

#wait(timeout_in_seconds = 60) ⇒ DeliveryReport

Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.

Raises:

  • (RdkafkaError)

    When delivering the message failed

  • (WaitTimeoutError)

    When the timeout has been reached and the handle is still pending


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/rdkafka/producer/delivery_handle.rb', line 38

def wait(timeout_in_seconds=60)
  timeout = if timeout_in_seconds
              Time.now.to_i + timeout_in_seconds
            else
              nil
            end
  loop do
    if pending?
      if timeout && timeout <= Time.now.to_i
        raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
      end
      sleep 0.1
      next
    elsif self[:response] != 0
      raise RdkafkaError.new(self[:response])
    else
      return DeliveryReport.new(self[:partition], self[:offset])
    end
  end
end