Module: Karafka::Connection::ApiAdapter

Defined in:
lib/karafka/connection/api_adapter.rb

Overview

Note:

The good thing about Kafka.new method is that it ignores all options that do nothing. So we don’t have to worry about injecting our internal settings into the client and breaking stuff

Mapper used to convert our internal settings into ruby-kafka settings based on their API requirements. Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires

Class Method Summary collapse

Class Method Details

.client(consumer_group) ⇒ Array<Hash>

Note:

We return array, so we can inject any arguments we want, in case of changes in the raw driver

Builds all the configuration settings for Kafka.new method

Parameters:

Returns:

  • (Array<Hash>)

    Array with all the client arguments including hash with all the settings required by Kafka.new method



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/connection/api_adapter.rb', line 22

def client(consumer_group)
  # This one is a default that takes all the settings except special
  # cases defined in the map
  settings = {
    logger: ::Karafka.logger,
    client_id: ::Karafka::App.config.client_id
  }

  kafka_configs.each_key do |setting_name|
    # All options for config adapter should be ignored as we're just interested
    # in what is left, as we want to pass all the options that are "typical"
    # and not listed in the api_adapter special cases mapping. All the values
    # from the api_adapter mapping go somewhere else, not to the client directly
    next if AttributesMap.api_adapter.values.flatten.include?(setting_name)

    # Settings for each consumer group are either defined per consumer group or are
    # inherited from the global/general settings level, thus we don't have to fetch them
    # from the kafka settings as they are already on a consumer group level
    settings[setting_name] = consumer_group.public_send(setting_name)
  end

  settings_hash = sanitize(settings)

  # Normalization for the way Kafka::Client accepts arguments from  0.5.3
  [settings_hash.delete(:seed_brokers), settings_hash]
end

.consumer(consumer_group) ⇒ Hash

Builds all the configuration settings for kafka#consumer method

Parameters:

Returns:

  • (Hash)

    all the consumer keyword arguments including hash with all the settings required by Kafka#consumer



53
54
55
56
57
# File 'lib/karafka/connection/api_adapter.rb', line 53

def consumer(consumer_group)
  settings = { group_id: consumer_group.id }
  settings = fetch_for(:consumer, consumer_group, settings)
  sanitize(settings)
end

.consumption(consumer_group) ⇒ Hash

Builds all the configuration settings for kafka consumer consume_each_batch and

consume_each_message methods

Parameters:

Returns:

  • (Hash)

    hash with all the arguments required by consuming method including all the settings required by Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method



65
66
67
68
69
70
71
72
73
# File 'lib/karafka/connection/api_adapter.rb', line 65

def consumption(consumer_group)
  sanitize(
    fetch_for(
      :consumption,
      consumer_group,
      automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
    )
  )
end

.mark_message_as_processed(params) ⇒ Array

Note:

When default empty topic mapper is used, no need for any conversion as the internal and external format are exactly the same

Remaps topic details taking the topic mapper feature into consideration.

Parameters:

Returns:

  • (Array)

    array with all the details needed by ruby-kafka to mark message as processed



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/karafka/connection/api_adapter.rb', line 106

def mark_message_as_processed(params)
  # Majority of users don't use custom topic mappers. No need to change anything when it
  # is a default mapper that does not change anything. Only some cloud providers require
  # topics to be remapped
  return [params.] if Karafka::App.config.topic_mapper.is_a?(
    Karafka::Routing::TopicMapper
  )

  # @note We don't use tap as it is around 13% slower than non-dup version
  dupped = params..dup
  dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params..topic)
  [dupped]
end

.pause(topic, partition, consumer_group) ⇒ Array

Builds all the configuration settings required by kafka consumer#pause method

Parameters:

  • topic (String)

    topic that we want to pause

  • partition (Integer)

    number partition that we want to pause

  • consumer_group (Karafka::Routing::ConsumerGroup)

    consumer group details

Returns:

  • (Array)

    array with all the details required to pause kafka consumer



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/karafka/connection/api_adapter.rb', line 88

def pause(topic, partition, consumer_group)
  [
    Karafka::App.config.topic_mapper.outgoing(topic),
    partition,
    {
      timeout: consumer_group.pause_timeout,
      max_timeout: consumer_group.pause_max_timeout,
      exponential_backoff: consumer_group.pause_exponential_backoff
    }
  ]
end

.subscribe(topic) ⇒ Hash

Builds all the configuration settings for kafka consumer#subscribe method

Parameters:

Returns:

  • (Hash)

    hash with all the settings required by kafka consumer#subscribe method



78
79
80
81
# File 'lib/karafka/connection/api_adapter.rb', line 78

def subscribe(topic)
  settings = fetch_for(:subscribe, topic)
  [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)]
end