Module: Karafka::Connection::ConfigAdapter
- Defined in:
- lib/karafka/connection/config_adapter.rb
Overview
The good thing about Kafka.new method is that it ignores all options that do nothing. So we don’t have to worry about injecting our internal settings into the client and breaking stuff
Mapper used to convert our internal settings into ruby-kafka settings Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires
Class Method Summary collapse
-
.client(_consumer_group) ⇒ Hash
Builds all the configuration settings for Kafka.new method.
-
.consumer(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka#consumer method.
-
.consuming(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka consumer consume_each_batch and consume_each_message methods.
-
.pausing(consumer_group) ⇒ Hash
Builds all the configuration settings required by kafka consumer#pause method.
-
.subscription(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method.
Class Method Details
.client(_consumer_group) ⇒ Hash
Builds all the configuration settings for Kafka.new method
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/karafka/connection/config_adapter.rb', line 18 def client(_consumer_group) # This one is a default that takes all the settings except special # cases defined in the map settings = { logger: ::Karafka.logger, client_id: ::Karafka::App.config.client_id } kafka_configs.each do |setting_name, setting_value| # All options for config adapter should be ignored as we're just interested # in what is left, as we want to pass all the options that are "typical" # and not listed in the config_adapter special cases mapping. All the values # from the config_adapter mapping go somewhere else, not to the client directly next if AttributesMap.config_adapter.values.flatten.include?(setting_name) settings[setting_name] = setting_value end sanitize(settings) end |
.consumer(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka#consumer method
42 43 44 45 46 |
# File 'lib/karafka/connection/config_adapter.rb', line 42 def consumer(consumer_group) settings = { group_id: consumer_group.id } settings = fetch_for(:consumer, consumer_group, settings) sanitize(settings) end |
.consuming(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka consumer consume_each_batch and
methods
53 54 55 56 57 58 |
# File 'lib/karafka/connection/config_adapter.rb', line 53 def consuming(consumer_group) settings = { automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed } sanitize(fetch_for(:consuming, consumer_group, settings)) end |
.pausing(consumer_group) ⇒ Hash
Builds all the configuration settings required by kafka consumer#pause method
71 72 73 |
# File 'lib/karafka/connection/config_adapter.rb', line 71 def pausing(consumer_group) { timeout: consumer_group.pause_timeout } end |
.subscription(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method
63 64 65 66 |
# File 'lib/karafka/connection/config_adapter.rb', line 63 def subscription(topic) settings = fetch_for(:subscription, topic) [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)] end |