Class: Karafka::Persistence::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/persistence/consumer.rb

Overview

Module used to provide a persistent cache across batch requests for a given topic and partition to store some additional details when the persistent mode for a given topic is turned on

Constant Summary collapse

PERSISTENCE_SCOPE =

Thread.current scope under which we store consumers data

:consumers

Class Method Summary collapse

Class Method Details

.allHash

Returns current thread persistence scope hash with all the consumers.

Returns:

  • (Hash)

    current thread persistence scope hash with all the consumers



16
17
18
19
20
# File 'lib/karafka/persistence/consumer.rb', line 16

def all
  # @note This does not need to be threadsafe (Hash) as it is always executed in a
  # current thread context
  Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }
end

.fetch(topic, partition) ⇒ Karafka::BaseConsumer

Used to build (if block given) and/or fetch a current consumer instance that will be

used to process messages from a given topic and partition

Parameters:

  • topic (Karafka::Routing::Topic)

    topic instance for which we might cache

  • partition (Integer)

    number of partition for which we want to cache

Returns:



27
28
29
30
31
32
33
34
# File 'lib/karafka/persistence/consumer.rb', line 27

def fetch(topic, partition)
  # We always store a current instance for callback reasons
  if topic.persistent
    all[topic][partition] ||= topic.consumer.new
  else
    all[topic][partition] = topic.consumer.new
  end
end