Class: Karafka::Connection::Proxy
- Inherits:
-
SimpleDelegator
- Object
- SimpleDelegator
- Karafka::Connection::Proxy
- Defined in:
- lib/karafka/connection/proxy.rb
Overview
Usually it is ok to use the ‘Rdkafka::Consumer` directly because we need 1:1 its functionality. There are however cases where we want to have extra recoveries or other handling of errors and settings. This is where this module comes in handy.
We do not want to wrap and delegate all via a proxy object for performance reasons, but we do still want to be able to alter some functionalities. This wrapper helps us do it when it would be needed
Instance Attribute Summary collapse
-
#wrapped ⇒ Object
(also: #__getobj__)
Returns the value of attribute wrapped.
Instance Method Summary collapse
-
#initialize(obj) ⇒ Proxy
constructor
A new instance of Proxy.
-
#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to ‘#query_watermark_offsets`, this method can be sensitive to latency.
-
#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>
Proxies the ‘#query_watermark_offsets` with extra recovery from timeout problems.
Constructor Details
#initialize(obj) ⇒ Proxy
Returns a new instance of Proxy.
26 27 28 29 30 31 32 33 |
# File 'lib/karafka/connection/proxy.rb', line 26 def initialize(obj) super # Do not allow for wrapping proxy with a proxy. This will prevent a case where we might # wrap an already wrapped object with another proxy level. Simplifies passing consumers # and makes it safe to wrap without type checking @wrapped = obj.is_a?(self.class) ? obj.wrapped : obj @config = ::Karafka::App.config.internal.connection.proxy end |
Instance Attribute Details
#wrapped ⇒ Object Also known as: __getobj__
Returns the value of attribute wrapped.
21 22 23 |
# File 'lib/karafka/connection/proxy.rb', line 21 def wrapped @wrapped end |
Instance Method Details
#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to ‘#query_watermark_offsets`, this method can be sensitive to latency. We handle this the same way
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/karafka/connection/proxy.rb', line 59 def offsets_for_times(tpl) l_config = @config.offsets_for_times with_broker_errors_retry( # required to be in seconds, not ms wait_time: l_config.wait_time / 1_000.to_f, max_attempts: l_config.max_attempts ) do @wrapped.offsets_for_times(tpl, l_config.timeout) end end |
#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>
Proxies the ‘#query_watermark_offsets` with extra recovery from timeout problems. We impose our own custom timeout to make sure, that high-latency clusters and overloaded clusters can handle our requests.
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/karafka/connection/proxy.rb', line 42 def query_watermark_offsets(topic, partition) l_config = @config.query_watermark_offsets with_broker_errors_retry( # required to be in seconds, not ms wait_time: l_config.wait_time / 1_000.to_f, max_attempts: l_config.max_attempts ) do @wrapped.query_watermark_offsets(topic, partition, l_config.timeout) end end |