Class: Karafka::Routing::Builder

Inherits:
Concurrent::Array
  • Object
show all
Defined in:
lib/karafka/routing/builder.rb

Overview

Builder used as a DSL layer for building consumers and telling them which topics to consume

Examples:

Build a simple (most common) route

consumers do
  topic :new_videos do
    consumer NewVideosConsumer
  end
end

Instance Method Summary collapse

Constructor Details

#initializeBuilder

Returns a new instance of Builder.



18
19
20
21
# File 'lib/karafka/routing/builder.rb', line 18

def initialize
  super
  @draws = Concurrent::Array.new
end

Instance Method Details

#activeArray<Karafka::Routing::ConsumerGroup>

Returns only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context.

Returns:

  • (Array<Karafka::Routing::ConsumerGroup>)

    only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context



52
53
54
# File 'lib/karafka/routing/builder.rb', line 52

def active
  select(&:active?)
end

#clearObject

Clears the builder and the draws memory



57
58
59
60
# File 'lib/karafka/routing/builder.rb', line 57

def clear
  @draws.clear
  super
end

#draw(&block) { ... } ⇒ Object

Note:

After it is done drawing it will store and validate all the routes to make sure that they are correct and that there are no topic/group duplications (this is forbidden)

Used to draw routes for Karafka

Examples:

draw do
  topic :xyz do
  end
end

Parameters:

  • block (Proc)

    block we will evaluate within the builder context

Yields:

  • Evaluates provided block in a builder context so we can describe routes

Raises:



35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/routing/builder.rb', line 35

def draw(&block)
  @draws << block

  instance_eval(&block)

  each do |consumer_group|
    hashed_group = consumer_group.to_h
    validation_result = CONTRACT.call(hashed_group)
    next if validation_result.success?

    raise Errors::InvalidConfigurationError, validation_result.errors.to_h
  end
end

#reloadObject

Note:

This won’t allow registration of new topics without process restart but will trigger cache invalidation so all the classes, etc are re-fetched after code reload

Redraws all the routes for the in-process code reloading.



65
66
67
68
69
# File 'lib/karafka/routing/builder.rb', line 65

def reload
  draws = @draws.dup
  clear
  draws.each { |block| draw(&block) }
end