Class: Karafka::Persistence::Topics

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/persistence/topics.rb

Overview

Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received

Class Method Summary collapse

Class Method Details

.clearObject

Clears the whole topics cache for all the threads This is used for in-development code reloading as we need to get rid of all the preloaded and cached instances of objects to make it work



39
40
41
42
43
44
# File 'lib/karafka/persistence/topics.rb', line 39

def clear
  Thread
    .list
    .select { |thread| thread[PERSISTENCE_SCOPE] }
    .each { |thread| thread[PERSISTENCE_SCOPE].clear }
end

.currentConcurrent::Hash

Returns hash with all the topics from given groups.

Returns:

  • (Concurrent::Hash)

    hash with all the topics from given groups



16
17
18
19
20
# File 'lib/karafka/persistence/topics.rb', line 16

def current
  Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key|
    hash[key] = Concurrent::Hash.new
  end
end

.fetch(group_id, raw_topic_name) ⇒ Karafka::Routing::Topics

Returns remapped topic representation that can be used further on when working with given parameters.

Parameters:

  • group_id (String)

    group id for which we fetch a topic representation

  • raw_topic_name (String)

    raw topic name (before remapping) for which we fetch a topic representation

Returns:

  • (Karafka::Routing::Topics)

    remapped topic representation that can be used further on when working with given parameters



27
28
29
30
31
32
33
34
# File 'lib/karafka/persistence/topics.rb', line 27

def fetch(group_id, raw_topic_name)
  current[group_id][raw_topic_name] ||= begin
    # We map from incoming topic name, as it might be namespaced, etc.
    # @see topic_mapper internal docs
    mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
    Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  end
end