Module: Karafka::Pro::Routing::Features::ParallelSegments::Topic

Defined in:
lib/karafka/pro/routing/features/parallel_segments/topic.rb

Overview

Parallel segments related expansions to the topic building flow

Instance Method Summary collapse

Instance Method Details

#initialize(*args) ⇒ Object

Injects the parallel segments filter as the first filter during building of each of the topics in case parallel segments are enabled.

Parameters:

  • args (Object)

    anything accepted by the topic initializer



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/pro/routing/features/parallel_segments/topic.rb', line 34

def initialize(*args)
  super

  return unless consumer_group.parallel_segments?

  builder = lambda do |topic, _partition|
    mom = topic.manual_offset_management?

    # We have two filters for mom and non-mom scenario not to mix this logic
    filter_scope = Karafka::Pro::Processing::ParallelSegments::Filters
    filter_class = mom ? filter_scope::Mom : filter_scope::Default

    filter_class.new(
      segment_id: consumer_group.segment_id,
      partitioner: consumer_group.parallel_segments.partitioner,
      reducer: consumer_group.parallel_segments.reducer
    )
  end

  filter(builder)
end