Module: Karafka::Pro::Routing::Features::ParallelSegments::Builder
- Defined in:
- lib/karafka/pro/routing/features/parallel_segments/builder.rb
Overview
Expansions for the routing builder
Instance Method Summary collapse
-
#consumer_group(group_id, &block) ⇒ Object
Builds and saves given consumer group.
Instance Method Details
#consumer_group(group_id, &block) ⇒ Object
Builds and saves given consumer group
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/routing/features/parallel_segments/builder.rb', line 33 def consumer_group(group_id, &block) consumer_group = find { |cg| cg.name == group_id.to_s } # Re-opening a CG should not change its parallel setup if consumer_group super else # We build a temp consumer group and a target to check if it has parallel segments # enabled and if so, we do not add it to the routing but instead we build the # appropriate number of parallel segment groups temp_consumer_group = Karafka::Routing::ConsumerGroup.new(group_id.to_s) temp_target = Karafka::Routing::Proxy.new(temp_consumer_group, &block).target config = temp_target.parallel_segments if config.active? config.count.times do |i| sub_name = [group_id, config.merge_key, i.to_s].join sub_consumer_group = Karafka::Routing::ConsumerGroup.new(sub_name) self << Karafka::Routing::Proxy.new(sub_consumer_group, &block).target end # If not parallel segments are not active we go with the default flow else super end end end |