Class: Rdkafka::Producer::DeliveryHandle

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/rdkafka/producer/delivery_handle.rb

Overview

Handle to wait for a delivery report which is returned when producing a message.

Defined Under Namespace

Classes: WaitTimeoutError

Instance Method Summary collapse

Instance Method Details

#pending?Boolean

Whether the delivery handle is still pending.

Returns:

  • (Boolean)


14
15
16
# File 'lib/rdkafka/producer/delivery_handle.rb', line 14

def pending?
  self[:pending]
end

#wait(timeout_in_seconds = 60) ⇒ DeliveryReport

Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.

Parameters:

  • timeout_in_seconds (Integer, nil) (defaults to: 60)

    Number of seconds to wait before timing out. If this is nil it does not time out.

Returns:

Raises:

  • (RdkafkaError)

    When delivering the message failed

  • (WaitTimeoutError)

    When the timeout has been reached and the handle is still pending



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rdkafka/producer/delivery_handle.rb', line 28

def wait(timeout_in_seconds=60)
  timeout = if timeout_in_seconds
              Time.now.to_i + timeout_in_seconds
            else
              nil
            end
  loop do
    if pending?
      if timeout && timeout <= Time.now.to_i
        raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
      end
      sleep 0.1
      next
    elsif self[:response] != 0
      raise RdkafkaError.new(self[:response])
    else
      return DeliveryReport.new(self[:partition], self[:offset])
    end
  end
end