Class: Rdkafka::Producer::DeliveryHandle

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/rdkafka/producer/delivery_handle.rb

Overview

Handle to wait for a delivery report which is returned when producing a message.

Defined Under Namespace

Classes: WaitTimeoutError

Constant Summary collapse

REGISTRY =
{}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.register(address, handle) ⇒ Object



17
18
19
# File 'lib/rdkafka/producer/delivery_handle.rb', line 17

def self.register(address, handle)
  REGISTRY[address] = handle
end

.remove(address) ⇒ Object



21
22
23
# File 'lib/rdkafka/producer/delivery_handle.rb', line 21

def self.remove(address)
  REGISTRY.delete(address)
end

Instance Method Details

#pending?Boolean

Whether the delivery handle is still pending.

Returns:

  • (Boolean)


28
29
30
# File 'lib/rdkafka/producer/delivery_handle.rb', line 28

def pending?
  self[:pending]
end

#wait(max_wait_timeout: 60, wait_timeout: 0.1) ⇒ DeliveryReport

Wait for the delivery report or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. In this case it is possible to call wait again.

Parameters:

  • max_wait_timeout (Numeric, nil) (defaults to: 60)

    Amount of time to wait before timing out. If this is nil it does not time out.

  • wait_timeout (Numeric) (defaults to: 0.1)

    Amount of time we should wait before we recheck if there is a delivery report available

Returns:

Raises:

  • (RdkafkaError)

    When delivering the message failed

  • (WaitTimeoutError)

    When the timeout has been reached and the handle is still pending



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rdkafka/producer/delivery_handle.rb', line 43

def wait(max_wait_timeout: 60, wait_timeout: 0.1)
  timeout = if max_wait_timeout
              CURRENT_TIME.call + max_wait_timeout
            else
              nil
            end
  loop do
    if pending?
      if timeout && timeout <= CURRENT_TIME.call
        raise WaitTimeoutError.new("Waiting for delivery timed out after #{max_wait_timeout} seconds")
      end
      sleep wait_timeout
    elsif self[:response] != 0
      raise RdkafkaError.new(self[:response])
    else
      return DeliveryReport.new(self[:partition], self[:offset])
    end
  end
end