Module: Karafka::Pro::Routing::Features::AdaptiveIterator::Topic

Defined in:
lib/karafka/pro/routing/features/adaptive_iterator/topic.rb

Overview

Topic extension allowing us to enable and configure adaptive iterator

Instance Method Summary collapse

Instance Method Details

#adaptive_iterator(active: false, safety_margin: 10, marking_method: :mark_as_consumed, clean_after_yielding: true) ⇒ Object

Parameters:

  • active (Boolean) (defaults to: false)

    should we use the automatic adaptive iterator

  • safety_margin (Integer) (defaults to: 10)

    How big of a margin we leave ourselves so we can safely communicate back with Kafka, etc. We stop and seek back when we’ve burned 85% of the time by default. We leave 15% of time for post-processing operations so we have space before we hit max.poll.interval.ms.

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    If we should, how should we mark

  • clean_after_yielding (Boolean) (defaults to: true)

    Should we clean post-yielding via the cleaner API



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 48

def adaptive_iterator(
  active: false,
  safety_margin: 10,
  marking_method: :mark_as_consumed,
  clean_after_yielding: true
)
  @adaptive_iterator ||= Config.new(
    active: active,
    safety_margin: safety_margin,
    marking_method: marking_method,
    clean_after_yielding: clean_after_yielding
  )
end

#adaptive_iterator?Boolean

Returns Is adaptive iterator active. It is always true, since we use it via explicit messages batch wrapper.

Returns:

  • (Boolean)

    Is adaptive iterator active. It is always true, since we use it via explicit messages batch wrapper



64
65
66
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 64

def adaptive_iterator?
  adaptive_iterator.active?
end

#initializeObject

This method calls the parent class initializer and then sets up the extra instance variable to nil. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



34
35
36
37
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 34

def initialize(...)
  super
  @adaptive_iterator = nil
end

#to_hHash

Returns topic with all its native configuration options plus poll guarding setup configuration.

Returns:

  • (Hash)

    topic with all its native configuration options plus poll guarding setup configuration.



70
71
72
73
74
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 70

def to_h
  super.merge(
    adaptive_iterator: adaptive_iterator.to_h
  ).freeze
end