Class: Karafka::Persistence::Topics
- Inherits:
-
Object
- Object
- Karafka::Persistence::Topics
- Defined in:
- lib/karafka/persistence/topics.rb
Overview
Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received
Class Method Summary collapse
-
.clear ⇒ Object
Clears the whole topics cache for all the threads This is used for in-development code reloading as we need to get rid of all the preloaded and cached instances of objects to make it work.
-
.current ⇒ Concurrent::Hash
Hash with all the topics from given groups.
-
.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topics
Remapped topic representation that can be used further on when working with given parameters.
Class Method Details
.clear ⇒ Object
Clears the whole topics cache for all the threads This is used for in-development code reloading as we need to get rid of all the preloaded and cached instances of objects to make it work
39 40 41 42 43 44 |
# File 'lib/karafka/persistence/topics.rb', line 39 def clear Thread .list .select { |thread| thread[PERSISTENCE_SCOPE] } .each { |thread| thread[PERSISTENCE_SCOPE].clear } end |
.current ⇒ Concurrent::Hash
Returns hash with all the topics from given groups.
16 17 18 19 20 |
# File 'lib/karafka/persistence/topics.rb', line 16 def current Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key| hash[key] = Concurrent::Hash.new end end |
.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topics
Returns remapped topic representation that can be used further on when working with given parameters.
27 28 29 30 31 32 33 34 |
# File 'lib/karafka/persistence/topics.rb', line 27 def fetch(group_id, raw_topic_name) current[group_id][raw_topic_name] ||= begin # We map from incoming topic name, as it might be namespaced, etc. # @see topic_mapper internal docs mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name) Routing::Router.find("#{group_id}_#{mapped_topic_name}") end end |