Class: Karafka::Persistence::Consumers

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/persistence/consumers.rb

Overview

Module used to provide a persistent cache across batch requests for a given topic and partition to store some additional details when the persistent mode for a given topic is turned on

Class Method Summary collapse

Class Method Details

.clearObject

Note:

This is used to reload consumers instances when code reloading in development mode is present. This should not be used in production.

Removes all persisted instances of consumers from the consumer cache



36
37
38
39
40
41
# File 'lib/karafka/persistence/consumers.rb', line 36

def clear
  Thread
    .list
    .select { |thread| thread[PERSISTENCE_SCOPE] }
    .each { |thread| thread[PERSISTENCE_SCOPE].clear }
end

.currentHash

Returns current thread’s persistence scope hash with all the consumers.

Returns:

  • (Hash)

    current thread’s persistence scope hash with all the consumers



18
19
20
21
22
# File 'lib/karafka/persistence/consumers.rb', line 18

def current
  Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key|
    hash[key] = Concurrent::Hash.new
  end
end

.fetch(topic, partition) ⇒ Karafka::BaseConsumer

Used to build (if block given) and/or fetch a current consumer instance that will be

used to process messages from a given topic and partition

Parameters:

  • topic (Karafka::Routing::Topic)

    topic instance for which we might cache

  • partition (Integer)

    number of partition for which we want to cache

Returns:



29
30
31
# File 'lib/karafka/persistence/consumers.rb', line 29

def fetch(topic, partition)
  current[topic][partition] ||= topic.consumer.new(topic)
end