Class: Karafka::Routing::Features::InlineInsights

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/routing/features/inline_insights.rb,
lib/karafka/routing/features/inline_insights/topic.rb,
lib/karafka/routing/features/inline_insights/config.rb,
lib/karafka/routing/features/inline_insights/contracts/topic.rb

Overview

Feature allowing us to get visibility during the consumption into metrics of particular partition we operate on. It can be useful when making context-aware consumers that change their behaviours based on the lag and other parameters.

Defined Under Namespace

Modules: Contracts, Topic Classes: Config

Class Method Summary collapse

Methods inherited from Base

activate, load_all, post_setup_all, pre_setup_all

Class Method Details

.post_setup(_config) ⇒ Object

If needed installs the needed listener and initializes tracker

Parameters:

  • _config (Karafka::Core::Configurable::Node)

    app config



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/routing/features/inline_insights.rb', line 14

def post_setup(_config)
  ::Karafka::App.monitor.subscribe('app.running') do
    # Do not activate tracking of statistics if none of our active topics uses it
    # This prevents us from tracking metrics when user just runs a subset of topics
    # in a given process and none of those actually utilizes this feature
    next unless ::Karafka::App
                .subscription_groups
                .values
                .flat_map(&:itself)
                .flat_map(&:topics)
                .flat_map(&:to_a)
                .any?(&:inline_insights?)

    # Initialize the tracker prior to becoming multi-threaded
    ::Karafka::Processing::InlineInsights::Tracker.instance

    # Subscribe to the statistics reports and collect them
    ::Karafka.monitor.subscribe(
      ::Karafka::Processing::InlineInsights::Listener.new
    )
  end
end