Module: Karafka::AttributesMap

Defined in:
lib/karafka/attributes_map.rb

Overview

Note:

Settings presented here cover all the settings that are being used across Karafka

Both Karafka and Ruby-Kafka contain a lot of settings that can be applied on multiple levels. In Karafka that is on consumer group and on the topic level. In Ruby-Kafka it is on consumer, subscription and consumption levels. In order to maintain an order in managing those settings, this module was created. It contains details on what setting where should go and which layer (both on Karafka and Ruby-Kafka) is responsible for setting it and sending it forward

Class Method Summary collapse

Class Method Details

.api_adapterHash

Note:

All other settings will be passed to Kafka.new method invocation. All elements in this hash are just edge cases

What settings should go where in ruby-kafka

Returns:

  • (Hash)

    hash with proper sections on what to proxy where in Ruby-Kafka



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/attributes_map.rb', line 17

def api_adapter
  {
    consumer: %i[
      session_timeout offset_commit_interval offset_commit_threshold
      offset_retention_time heartbeat_interval fetcher_max_queue_size
      assignment_strategy
    ],
    subscribe: %i[start_from_beginning max_bytes_per_partition],
    consumption: %i[min_bytes max_bytes max_wait_time],
    pause: %i[pause_timeout pause_max_timeout pause_exponential_backoff],
    # All the options that are under kafka config namespace, but are not used
    # directly with kafka api, but from the Karafka user perspective, they are
    # still related to kafka. They should not be proxied anywhere
    ignored: %i[reconnect_timeout automatically_mark_as_consumed]
  }
end

.consumer_groupArray<Symbol>

Note:

Note that there are settings directly extracted from the config kafka namespace I did this that way, so I won’t have to repeat same setting keys over and over again Thanks to this solution, if any new setting is available for ruby-kafka, we just need to add it to our configuration class and it will be handled automatically.

Returns properties that can be set on a per consumer group level.

Returns:

  • (Array<Symbol>)

    properties that can be set on a per consumer group level



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/karafka/attributes_map.rb', line 50

def consumer_group
  # @note We don't ignore the api_adapter[:ignored] values as they should be ignored
  #   only when proxying details go ruby-kafka. We use ignored fields internally in karafka
  ignored_settings = api_adapter[:subscribe]
  defined_settings = api_adapter.values.flatten
  karafka_settings = %i[batch_fetching]

  dynamically_proxied = Karafka::Setup::Config.config.kafka.to_h.keys

  (defined_settings + dynamically_proxied).uniq + karafka_settings - ignored_settings
end

.topicArray<Symbol>

Returns properties that can be set on a per topic level.

Returns:

  • (Array<Symbol>)

    properties that can be set on a per topic level



35
36
37
38
39
40
41
42
43
# File 'lib/karafka/attributes_map.rb', line 35

def topic
  (api_adapter[:subscribe] + %i[
    backend
    name
    deserializer
    responder
    batch_consuming
  ]).uniq
end