Class: Karafka::Persistence::Consumer
- Inherits:
-
Object
- Object
- Karafka::Persistence::Consumer
- Defined in:
- lib/karafka/persistence/consumer.rb
Overview
Module used to provide a persistent cache across batch requests for a given topic and partition to store some additional details when the persistent mode for a given topic is turned on
Constant Summary collapse
- PERSISTENCE_SCOPE =
Thread.current scope under which we store consumers data
:consumers
Class Method Summary collapse
-
.all ⇒ Hash
Current thread persistence scope hash with all the consumers.
-
.fetch(topic, partition) ⇒ Karafka::BaseConsumer
Used to build (if block given) and/or fetch a current consumer instance that will be used to process messages from a given topic and partition.
Class Method Details
.all ⇒ Hash
Returns current thread persistence scope hash with all the consumers.
16 17 18 19 20 |
# File 'lib/karafka/persistence/consumer.rb', line 16 def all # @note This does not need to be threadsafe (Hash) as it is always executed in a # current thread context Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} } end |
.fetch(topic, partition) ⇒ Karafka::BaseConsumer
Used to build (if block given) and/or fetch a current consumer instance that will be
used to process from a given topic and partition
27 28 29 30 31 32 33 34 |
# File 'lib/karafka/persistence/consumer.rb', line 27 def fetch(topic, partition) # We always store a current instance for callback reasons if topic.persistent all[topic][partition] ||= topic.consumer.new else all[topic][partition] = topic.consumer.new end end |