Module: Karafka::Connection::ApiAdapter
- Defined in:
- lib/karafka/connection/api_adapter.rb
Overview
The good thing about Kafka.new method is that it ignores all options that do nothing. So we don’t have to worry about injecting our internal settings into the client and breaking stuff
Mapper used to convert our internal settings into ruby-kafka settings based on their API requirements. Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires
Class Method Summary collapse
-
.client ⇒ Array<Hash>
Builds all the configuration settings for Kafka.new method.
-
.consumer(consumer_group) ⇒ Array<Hash>
Builds all the configuration settings for kafka#consumer method.
-
.consumption(consumer_group) ⇒ Array<Hash>
Builds all the configuration settings for kafka consumer consume_each_batch and consume_each_message methods.
-
.mark_message_as_processed(params) ⇒ Array
Remaps topic details taking the topic mapper feature into consideration.
-
.pause(topic, partition, consumer_group) ⇒ Array
Builds all the configuration settings required by kafka consumer#pause method.
-
.subscribe(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method.
Class Method Details
.client ⇒ Array<Hash>
We return array, so we can inject any arguments we want, in case of changes in the raw driver
Builds all the configuration settings for Kafka.new method
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/karafka/connection/api_adapter.rb', line 21 def client # This one is a default that takes all the settings except special # cases defined in the map settings = { logger: ::Karafka.logger, client_id: ::Karafka::App.config.client_id } kafka_configs.each do |setting_name, setting_value| # All options for config adapter should be ignored as we're just interested # in what is left, as we want to pass all the options that are "typical" # and not listed in the api_adapter special cases mapping. All the values # from the api_adapter mapping go somewhere else, not to the client directly next if AttributesMap.api_adapter.values.flatten.include?(setting_name) settings[setting_name] = setting_value end settings_hash = sanitize(settings) # Normalization for the way Kafka::Client accepts arguments from 0.5.3 [settings_hash.delete(:seed_brokers), settings_hash] end |
.consumer(consumer_group) ⇒ Array<Hash>
Builds all the configuration settings for kafka#consumer method
49 50 51 52 53 |
# File 'lib/karafka/connection/api_adapter.rb', line 49 def consumer(consumer_group) settings = { group_id: consumer_group.id } settings = fetch_for(:consumer, consumer_group, settings) [sanitize(settings)] end |
.consumption(consumer_group) ⇒ Array<Hash>
Builds all the configuration settings for kafka consumer consume_each_batch and
methods
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/karafka/connection/api_adapter.rb', line 61 def consumption(consumer_group) [ sanitize( fetch_for( :consumption, consumer_group, automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed ) ) ] end |
.mark_message_as_processed(params) ⇒ Array
When default empty topic mapper is used, no need for any conversion as the internal and external format are exactly the same
Remaps topic details taking the topic mapper feature into consideration.
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/karafka/connection/api_adapter.rb', line 104 def (params) # Majority of users don't use custom topic mappers. No need to change anything when it # is a default mapper that does not change anything. Only some cloud providers require # topics to be remapped return [params] if Karafka::App.config.topic_mapper.is_a?(Karafka::Routing::TopicMapper) # @note We don't use tap as it is around 13% slower than non-dup version dupped = params.dup dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params.topic) [dupped] end |
.pause(topic, partition, consumer_group) ⇒ Array
Builds all the configuration settings required by kafka consumer#pause method
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/karafka/connection/api_adapter.rb', line 86 def pause(topic, partition, consumer_group) [ Karafka::App.config.topic_mapper.outgoing(topic), partition, { timeout: consumer_group.pause_timeout, max_timeout: consumer_group.pause_max_timeout, exponential_backoff: consumer_group.pause_exponential_backoff } ] end |
.subscribe(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method
76 77 78 79 |
# File 'lib/karafka/connection/api_adapter.rb', line 76 def subscribe(topic) settings = fetch_for(:subscribe, topic) [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)] end |