Module: Karafka::Connection::ApiAdapter

Defined in:
lib/karafka/connection/api_adapter.rb

Overview

Note:

The good thing about Kafka.new method is that it ignores all options that do nothing. So we don’t have to worry about injecting our internal settings into the client and breaking stuff

Mapper used to convert our internal settings into ruby-kafka settings based on their API requirements. Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires

Class Method Summary collapse

Class Method Details

.clientArray<Hash>

Note:

We return array, so we can inject any arguments we want, in case of changes in the raw driver

Builds all the configuration settings for Kafka.new method

Returns:

  • (Array<Hash>)

    Array with all the client arguments including hash with all the settings required by Kafka.new method



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/connection/api_adapter.rb', line 21

def client
  # This one is a default that takes all the settings except special
  # cases defined in the map
  settings = {
    logger: ::Karafka.logger,
    client_id: ::Karafka::App.config.client_id
  }

  kafka_configs.each do |setting_name, setting_value|
    # All options for config adapter should be ignored as we're just interested
    # in what is left, as we want to pass all the options that are "typical"
    # and not listed in the api_adapter special cases mapping. All the values
    # from the api_adapter mapping go somewhere else, not to the client directly
    next if AttributesMap.api_adapter.values.flatten.include?(setting_name)

    settings[setting_name] = setting_value
  end

  settings_hash = sanitize(settings)

  # Normalization for the way Kafka::Client accepts arguments from  0.5.3
  [settings_hash.delete(:seed_brokers), settings_hash]
end

.consumer(consumer_group) ⇒ Array<Hash>

Builds all the configuration settings for kafka#consumer method

Parameters:

Returns:

  • (Array<Hash>)

    array with all the consumer arguments including hash with all the settings required by Kafka#consumer



49
50
51
52
53
# File 'lib/karafka/connection/api_adapter.rb', line 49

def consumer(consumer_group)
  settings = { group_id: consumer_group.id }
  settings = fetch_for(:consumer, consumer_group, settings)
  [sanitize(settings)]
end

.consumption(consumer_group) ⇒ Array<Hash>

Builds all the configuration settings for kafka consumer consume_each_batch and

consume_each_message methods

Parameters:

Returns:

  • (Array<Hash>)

    Array with all the arguments required by consuming method including hash with all the settings required by Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/karafka/connection/api_adapter.rb', line 61

def consumption(consumer_group)
  [
    sanitize(
      fetch_for(
        :consumption,
        consumer_group,
        automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
      )
    )
  ]
end

.mark_message_as_processed(params) ⇒ Array

Note:

When default empty topic mapper is used, no need for any conversion as the internal and external format are exactly the same

Remaps topic details taking the topic mapper feature into consideration.

Parameters:

Returns:

  • (Array)

    array with all the details needed by ruby-kafka to mark message as processed



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/connection/api_adapter.rb', line 104

def mark_message_as_processed(params)
  # Majority of users don't use custom topic mappers. No need to change anything when it
  # is a default mapper that does not change anything. Only some cloud providers require
  # topics to be remapped
  return [params] if Karafka::App.config.topic_mapper.is_a?(Karafka::Routing::TopicMapper)

  # @note We don't use tap as it is around 13% slower than non-dup version
  dupped = params.dup
  dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params.topic)
  [dupped]
end

.pause(topic, partition, consumer_group) ⇒ Array

Builds all the configuration settings required by kafka consumer#pause method

Parameters:

  • topic (String)

    topic that we want to pause

  • partition (Integer)

    number partition that we want to pause

  • consumer_group (Karafka::Routing::ConsumerGroup)

    consumer group details

Returns:

  • (Array)

    array with all the details required to pause kafka consumer



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/karafka/connection/api_adapter.rb', line 86

def pause(topic, partition, consumer_group)
  [
    Karafka::App.config.topic_mapper.outgoing(topic),
    partition,
    {
      timeout: consumer_group.pause_timeout,
      max_timeout: consumer_group.pause_max_timeout,
      exponential_backoff: consumer_group.pause_exponential_backoff
    }
  ]
end

.subscribe(topic) ⇒ Hash

Builds all the configuration settings for kafka consumer#subscribe method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by kafka consumer#subscribe method



76
77
78
79
# File 'lib/karafka/connection/api_adapter.rb', line 76

def subscribe(topic)
  settings = fetch_for(:subscribe, topic)
  [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)]
end