Module: Karafka::Connection::Processor

Defined in:
lib/karafka/connection/processor.rb

Overview

Class that consumes messages for which we listen

Class Method Summary collapse

Class Method Details

.process(group_id, kafka_messages) ⇒ Object

Note:

This should be looped to obtain a constant listening

Note:

We catch all the errors here, to make sure that none failures for a given consumption will affect other consumed messages If we wouldn’t catch it, it would propagate up until killing the thread

Processes messages (does something with them) It will either schedule or run a proper controller action for messages

Parameters:

  • group_id (String)

    group_id of a group from which a given message came

  • kafka_messages (Array<Kafka::FetchedMessage>)

    raw messages fetched from kafka



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/connection/processor.rb', line 16

def process(group_id, kafka_messages)
  # @note We always get messages by topic and partition so we can take topic from the
  # first one and it will be valid for all the messages
  # We map from incoming topic name, as it might be namespaced, etc.
  # @see topic_mapper internal docs
  mapped_topic_name = Karafka::App.config.topic_mapper.incoming(kafka_messages[0].topic)
  topic = Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  controller = Persistence::Controller.fetch(topic, kafka_messages[0].partition) do
    topic.controller.new
  end

  # Depending on a case (persisted or not) we might use new controller instance per each
  # batch, or use the same instance for all of them (for implementing buffering, etc)
  send(
    topic.batch_consuming ? :process_batch : :process_each,
    controller,
    kafka_messages
  )
end