Class: Karafka::Pro::Routing::Features::Patterns::Detector

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/routing/features/patterns/detector.rb

Overview

Note:

This is NOT thread-safe and should run in a thread-safe context that warranties that there won’t be any race conditions

Detects if a given topic matches any of the patterns and if so, injects it into the given subscription group routing

Instance Method Summary collapse

Instance Method Details

#expand(sg_topics, new_topic) ⇒ Object

Checks if the provided topic matches any of the patterns and when detected, expands the routing with it.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/pro/routing/features/patterns/detector.rb', line 29

def expand(sg_topics, new_topic)
  MUTEX.synchronize do
    sg_topics
      .map(&:patterns)
      .select(&:active?)
      .select(&:matcher?)
      .map(&:pattern)
      .then { |pts| pts.empty? ? return : pts }
      .then { |pts| Patterns.new(pts) }
      .find(new_topic)
      .then { |pattern| pattern || return }
      .then { |pattern| install(pattern, new_topic, sg_topics) }
  end
end