Module: Karafka::Connection::ConfigAdapter

Defined in:
lib/karafka/connection/config_adapter.rb

Overview

Note:

The good thing about Kafka.new method is that it ignores all options that do nothing. So we don’t have to worry about injecting our internal settings into the client and breaking stuff

Mapper used to convert our internal settings into ruby-kafka settings Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires

Class Method Summary collapse

Class Method Details

.client(_consumer_group) ⇒ Hash

Builds all the configuration settings for Kafka.new method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by Kafka.new method



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/karafka/connection/config_adapter.rb', line 18

def client(_consumer_group)
  # This one is a default that takes all the settings except special
  # cases defined in the map
  settings = {
    logger: ::Karafka.logger,
    client_id: ::Karafka::App.config.client_id
  }

  kafka_configs.each do |setting_name, setting_value|
    # All options for config adapter should be ignored as we're just interested
    # in what is left, as we want to pass all the options that are "typical"
    # and not listed in the config_adapter special cases mapping. All the values
    # from the config_adapter mapping go somewhere else, not to the client directly
    next if AttributesMap.config_adapter.values.flatten.include?(setting_name)

    settings[setting_name] = setting_value
  end

  sanitize(settings)
end

.consumer(consumer_group) ⇒ Hash

Builds all the configuration settings for kafka#consumer method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by Kafka#consumer method



42
43
44
45
46
# File 'lib/karafka/connection/config_adapter.rb', line 42

def consumer(consumer_group)
  settings = { group_id: consumer_group.id }
  settings = fetch_for(:consumer, consumer_group, settings)
  sanitize(settings)
end

.consuming(consumer_group) ⇒ Hash

Builds all the configuration settings for kafka consumer consume_each_batch and

consume_each_message methods

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method



53
54
55
56
57
58
# File 'lib/karafka/connection/config_adapter.rb', line 53

def consuming(consumer_group)
  settings = {
    automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
  }
  sanitize(fetch_for(:consuming, consumer_group, settings))
end

.pausing(consumer_group) ⇒ Hash

Builds all the configuration settings required by kafka consumer#pause method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required to pause kafka consumer



71
72
73
# File 'lib/karafka/connection/config_adapter.rb', line 71

def pausing(consumer_group)
  { timeout: consumer_group.pause_timeout }
end

.subscription(topic) ⇒ Hash

Builds all the configuration settings for kafka consumer#subscribe method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by kafka consumer#subscribe method



63
64
65
66
# File 'lib/karafka/connection/config_adapter.rb', line 63

def subscription(topic)
  settings = fetch_for(:subscription, topic)
  [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)]
end