Class: Karafka::Persistence::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/persistence/topic.rb

Overview

Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received

Constant Summary collapse

PERSISTENCE_SCOPE =

Thread.current scope under which we store topics data

:topics

Class Method Summary collapse

Class Method Details

.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topic



17
18
19
20
21
22
23
24
25
26
# File 'lib/karafka/persistence/topic.rb', line 17

def self.fetch(group_id, raw_topic_name)
  Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }

  Thread.current[PERSISTENCE_SCOPE][group_id][raw_topic_name] ||= begin
    # We map from incoming topic name, as it might be namespaced, etc.
    # @see topic_mapper internal docs
    mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
    Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  end
end