Class: Karafka::Connection::Proxy

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/karafka/connection/proxy.rb

Overview

Usually it is ok to use the ‘Rdkafka::Consumer` directly because we need 1:1 its functionality. There are however cases where we want to have extra recoveries or other handling of errors and settings. This is where this module comes in handy.

We do not want to wrap and delegate all via a proxy object for performance reasons, but we do still want to be able to alter some functionalities. This wrapper helps us do it when it would be needed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(obj) ⇒ Proxy

Returns a new instance of Proxy.

Parameters:

  • obj (Rdkafka::Consumer, Proxy)

    rdkafka consumer or consumer wrapped with proxy



26
27
28
29
30
31
32
33
# File 'lib/karafka/connection/proxy.rb', line 26

def initialize(obj)
  super
  # Do not allow for wrapping proxy with a proxy. This will prevent a case where we might
  # wrap an already wrapped object with another proxy level. Simplifies passing consumers
  # and makes it safe to wrap without type checking
  @wrapped = obj.is_a?(self.class) ? obj.wrapped : obj
  @config = ::Karafka::App.config.internal.connection.proxy
end

Instance Attribute Details

#wrappedObject Also known as: __getobj__

Returns the value of attribute wrapped.



21
22
23
# File 'lib/karafka/connection/proxy.rb', line 21

def wrapped
  @wrapped
end

Instance Method Details

#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList

Similar to ‘#query_watermark_offsets`, this method can be sensitive to latency. We handle this the same way

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)

    tpl to get time offsets

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    tpl with time offsets



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/connection/proxy.rb', line 59

def offsets_for_times(tpl)
  l_config = @config.offsets_for_times

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.offsets_for_times(tpl, l_config.timeout)
  end
end

#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>

Proxies the ‘#query_watermark_offsets` with extra recovery from timeout problems. We impose our own custom timeout to make sure, that high-latency clusters and overloaded clusters can handle our requests.

Parameters:

  • topic (String)

    topic name

  • partition (Partition)

Returns:

  • (Array<Integer, Integer>)

    watermark offsets



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/connection/proxy.rb', line 42

def query_watermark_offsets(topic, partition)
  l_config = @config.query_watermark_offsets

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.query_watermark_offsets(topic, partition, l_config.timeout)
  end
end