Module: Karafka::Pro::Routing::Features::ParallelSegments::Builder

Defined in:
lib/karafka/pro/routing/features/parallel_segments/builder.rb

Overview

Expansions for the routing builder

Instance Method Summary collapse

Instance Method Details

#consumer_group(group_id, &block) ⇒ Object

Builds and saves given consumer group

Parameters:

  • group_id (String, Symbol)

    name for consumer group

  • block (Proc)

    proc that should be executed in the proxy context



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/pro/routing/features/parallel_segments/builder.rb', line 33

def consumer_group(group_id, &block)
  consumer_group = find { |cg| cg.name == group_id.to_s }

  # Re-opening a CG should not change its parallel setup
  if consumer_group
    super
  else
    # We build a temp consumer group and a target to check if it has parallel segments
    # enabled and if so, we do not add it to the routing but instead we build the
    # appropriate number of parallel segment groups
    temp_consumer_group = Karafka::Routing::ConsumerGroup.new(group_id.to_s)
    temp_target = Karafka::Routing::Proxy.new(temp_consumer_group, &block).target
    config = temp_target.parallel_segments

    if config.active?
      config.count.times do |i|
        sub_name = [group_id, config.merge_key, i.to_s].join
        sub_consumer_group = Karafka::Routing::ConsumerGroup.new(sub_name)
        self << Karafka::Routing::Proxy.new(sub_consumer_group, &block).target
      end
    # If not parallel segments are not active we go with the default flow
    else
      super
    end
  end
end