Class: Karafka::App

Inherits:
Object
  • Object
show all
Extended by:
Setup::Dsl
Defined in:
lib/karafka/app.rb

Overview

App class

Class Method Summary collapse

Methods included from Setup::Dsl

config, setup

Class Method Details

.consumer_groupsKarafka::Routing::Builder Also known as: routes

Returns consumers builder instance alias.

Returns:



10
11
12
13
14
15
# File 'lib/karafka/app.rb', line 10

def consumer_groups
  config
    .internal
    .routing
    .builder
end

.done?Boolean

Note:

It is a meta status from the status object

Returns true if we should be done in general with processing anything.

Returns:

  • (Boolean)

    true if we should be done in general with processing anything



58
59
60
# File 'lib/karafka/app.rb', line 58

def done?
  App.config.internal.status.done?
end

.subscription_groupsHash

Returns active subscription groups grouped based on consumer group in a hash.

Returns:

  • (Hash)

    active subscription groups grouped based on consumer group in a hash



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/app.rb', line 18

def subscription_groups
  # We first build all the subscription groups, so they all get the same position, despite
  # later narrowing that. It allows us to maintain same position number for static members
  # even when we want to run subset of consumer groups or subscription groups
  #
  # We then narrow this to active consumer groups from which we select active subscription
  # groups.
  consumer_groups
    .map { |cg| [cg, cg.subscription_groups] }
    .select { |cg, _| cg.active? }
    .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } }
    .delete_if { |_, sgs| sgs.empty? }
    .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |top| !top.active? } } }
    .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } }
    .reject { |cg, _| cg.subscription_groups.empty? }
    .to_h
end