Class: Karafka::Processing::InlineInsights::Tracker
- Inherits:
-
Object
- Object
- Karafka::Processing::InlineInsights::Tracker
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time, Singleton
- Defined in:
- lib/karafka/processing/inline_insights/tracker.rb
Overview
We include cache of 5 minutes for revoked partitions to compensate for cases where when using LRJ a lost partition data would not be present anymore, however we would still be in the processing phase. Since those metrics are published with each ‘poll`, regular processing is not a subject of this issue. For LRJ we keep the reference. The only case where this could be switched midway is when LRJ is running for an extended period of time after the involuntary revocation. Having a time based cache instead of tracking simplifies the design as we do not have to deal with state tracking, especially since we would have to track also operations running in a revoked state.
This tracker keeps in memory data about all topics and partitions that it encounters because in case of routing patterns, we may start getting statistics prior to registering given topic via dynamic routing expansions. In such case we would not have insights where they were actually available for us to use.
Memory usage is negligible as long as we can evict expired data. Single metrics set for a single partition contains around 4KB of data. This means, that in case of an assignment of 1000 partitions, we use around 4MB of space for tracking those metrics.
Object used to track statistics coming from librdkafka in a way that can be accessible by the consumers
We use a single tracker because we do not need state management here as our consumer groups clients identified by statistics name value are unique. On top of that, having a per process one that is a singleton allows us to use tracker easily also from other places like filtering API etc.
Instance Method Summary collapse
-
#add(consumer_group_id, statistics) ⇒ Object
Adds each partition statistics into internal accumulator.
-
#clear ⇒ Object
Clears the tracker.
-
#find(topic, partition) ⇒ Hash
Finds statistics about requested consumer group topic partition.
-
#initialize ⇒ Tracker
constructor
Initializes the tracker with empty accumulator.
Constructor Details
#initialize ⇒ Tracker
Initializes the tracker with empty accumulator
55 56 57 58 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 55 def initialize @accu = {} @mutex = Mutex.new end |
Instance Method Details
#add(consumer_group_id, statistics) ⇒ Object
Adds each partition statistics into internal accumulator. Single statistics set may contain data from multiple topics and their partitions because a single client can operate on multiple topics and partitions.
We iterate over those topics and partitions and store topics partitions data only.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 69 def add(consumer_group_id, statistics) @mutex.synchronize do statistics.fetch('topics', EMPTY_HASH).each do |topic_name, t_details| t_details.fetch('partitions', EMPTY_HASH).each do |partition_id, p_details| next unless track?(partition_id, p_details) key = "#{consumer_group_id}_#{topic_name}_#{partition_id}" @accu[key] = [monotonic_now, p_details] end end evict end end |
#clear ⇒ Object
Clears the tracker
99 100 101 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 99 def clear @mutex.synchronize { @accu.clear } end |
#find(topic, partition) ⇒ Hash
We do not enclose it with a mutex mainly because the only thing that could happen here that would be a race-condition is a miss that anyhow we need to support due to how librdkafka ships metrics and a potential removal of data on heavily revoked LRJ.
Finds statistics about requested consumer group topic partition
93 94 95 96 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 93 def find(topic, partition) key = "#{topic.consumer_group.id}_#{topic.name}_#{partition}" @accu.fetch(key, EMPTY_ARRAY).last || EMPTY_HASH end |