Class: Karafka::Persistence::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/persistence/topic.rb

Overview

Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received

Constant Summary collapse

PERSISTENCE_SCOPE =

Thread.current scope under which we store topics data

:topics

Class Method Summary collapse

Class Method Details

.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topic

Returns remapped topic representation that can be used further on when working with given parameters.

Parameters:

  • group_id (String)

    group id for which we fetch a topic representation

  • raw_topic_name (String)

    raw topic name (before remapping) for which we fetch a topic representation

Returns:

  • (Karafka::Routing::Topic)

    remapped topic representation that can be used further on when working with given parameters



17
18
19
20
21
22
23
24
25
26
# File 'lib/karafka/persistence/topic.rb', line 17

def self.fetch(group_id, raw_topic_name)
  Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }

  Thread.current[PERSISTENCE_SCOPE][group_id][raw_topic_name] ||= begin
    # We map from incoming topic name, as it might be namespaced, etc.
    # @see topic_mapper internal docs
    mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
    Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  end
end