Class: Karafka::Persistence::Topic
- Inherits:
-
Object
- Object
- Karafka::Persistence::Topic
- Defined in:
- lib/karafka/persistence/topic.rb
Overview
Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received
Constant Summary collapse
- PERSISTENCE_SCOPE =
Thread.current scope under which we store topics data
:topics
Class Method Summary collapse
-
.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topic
Remapped topic representation that can be used further on when working with given parameters.
Class Method Details
.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topic
Returns remapped topic representation that can be used further on when working with given parameters.
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/karafka/persistence/topic.rb', line 17 def self.fetch(group_id, raw_topic_name) Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} } Thread.current[PERSISTENCE_SCOPE][group_id][raw_topic_name] ||= begin # We map from incoming topic name, as it might be namespaced, etc. # @see topic_mapper internal docs mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name) Routing::Router.find("#{group_id}_#{mapped_topic_name}") end end |