Class: Karafka::Processing::InlineInsights::Tracker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Singleton
Defined in:
lib/karafka/processing/inline_insights/tracker.rb

Overview

Object used to t

We use a single tracker because we do not need state management here as our consumer groups clients identified by statistics name value are unique. On top of that, having a per process one that is a singleton allows us to use tracker easily also from other places like filtering API etc.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



27
28
29
30
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 27

def initialize
  @accu = Hash.new { |h, k| h[k] = {} }
  @mutex = Mutex.new
end

Instance Method Details

#add(consumer_group_id, statistics) ⇒ Object

Adds client statistics into internal accumulator. Single statistics set may contain data from multiple topics and their partitions because a single client can operate on multiple topics and partitions. This is why during the ‘#find` request we locate appropriate data from within of this set of metrics

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which statistics were emitted.

  • statistics (Hash)

    librdkafka enriched statistics



40
41
42
43
44
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 40

def add(consumer_group_id, statistics)
  @mutex.synchronize do
    @accu[consumer_group_id][statistics.fetch('name')] = statistics
  end
end

#clearObject

Clears the tracker



79
80
81
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 79

def clear
  @mutex.synchronize { @accu.clear }
end

#exists?(topic, partition) ⇒ Boolean

Returns true if statistics of a given topic exist, otherwise false.

Parameters:

Returns:

  • (Boolean)

    true if statistics of a given topic exist, otherwise false



74
75
76
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 74

def exists?(topic, partition)
  !find(topic, partition).empty?
end

#find(topic, partition) ⇒ Hash

Note:

We do not enclose it with a mutex mainly because the only thing that could happen here that would be a race-condition is a miss that anyhow we need to support due to how librdkafka ships metrics.

Finds statistics about requested consumer group topic partition

Parameters:

Returns:

  • (Hash)

    hash with given topic partition statistics or empty hash if not present



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 55

def find(topic, partition)
  @accu
    .fetch(topic.consumer_group.id, EMPTY_HASH)
    .each_value do |statistics|
      result = statistics
               .fetch('topics', EMPTY_HASH)
               .fetch(topic.name, EMPTY_HASH)
               .fetch('partitions', EMPTY_HASH)
               .fetch(partition.to_s, false)

      return result if result
    end

  EMPTY_HASH
end