Class: Karafka::Processing::InlineInsights::Tracker
- Inherits:
-
Object
- Object
- Karafka::Processing::InlineInsights::Tracker
- Extended by:
- Forwardable
- Includes:
- Singleton
- Defined in:
- lib/karafka/processing/inline_insights/tracker.rb
Overview
Object used to t
We use a single tracker because we do not need state management here as our consumer groups clients identified by statistics name value are unique. On top of that, having a per process one that is a singleton allows us to use tracker easily also from other places like filtering API etc.
Instance Method Summary collapse
-
#add(consumer_group_id, statistics) ⇒ Object
Adds client statistics into internal accumulator.
-
#clear ⇒ Object
Clears the tracker.
-
#exists?(topic, partition) ⇒ Boolean
True if statistics of a given topic exist, otherwise false.
-
#find(topic, partition) ⇒ Hash
Finds statistics about requested consumer group topic partition.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
27 28 29 30 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 27 def initialize @accu = Hash.new { |h, k| h[k] = {} } @mutex = Mutex.new end |
Instance Method Details
#add(consumer_group_id, statistics) ⇒ Object
Adds client statistics into internal accumulator. Single statistics set may contain data from multiple topics and their partitions because a single client can operate on multiple topics and partitions. This is why during the ‘#find` request we locate appropriate data from within of this set of metrics
40 41 42 43 44 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 40 def add(consumer_group_id, statistics) @mutex.synchronize do @accu[consumer_group_id][statistics.fetch('name')] = statistics end end |
#clear ⇒ Object
Clears the tracker
79 80 81 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 79 def clear @mutex.synchronize { @accu.clear } end |
#exists?(topic, partition) ⇒ Boolean
Returns true if statistics of a given topic exist, otherwise false.
74 75 76 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 74 def exists?(topic, partition) !find(topic, partition).empty? end |
#find(topic, partition) ⇒ Hash
We do not enclose it with a mutex mainly because the only thing that could happen here that would be a race-condition is a miss that anyhow we need to support due to how librdkafka ships metrics.
Finds statistics about requested consumer group topic partition
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 55 def find(topic, partition) @accu .fetch(topic.consumer_group.id, EMPTY_HASH) .each_value do |statistics| result = statistics .fetch('topics', EMPTY_HASH) .fetch(topic.name, EMPTY_HASH) .fetch('partitions', EMPTY_HASH) .fetch(partition.to_s, false) return result if result end EMPTY_HASH end |