Module: Karafka::Connection::MessageDelegator

Defined in:
lib/karafka/connection/message_delegator.rb

Overview

Class that delegates processing of a single received message for which we listen to a proper processor

Class Method Summary collapse

Class Method Details

.call(group_id, kafka_message) ⇒ Object

Note:

This should be looped to obtain a constant delegating of new messages

Delegates message (does something with it) It will either schedule or run a proper processor action for the incoming message

Parameters:

  • group_id (String)

    group_id of a group from which a given message came

  • kafka_message (<Kafka::FetchedMessage>)

    raw message from kafka



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/connection/message_delegator.rb', line 14

def call(group_id, kafka_message)
  topic = Persistence::Topics.fetch(group_id, kafka_message.topic)
  consumer = Persistence::Consumers.fetch(topic, kafka_message.partition)

  Karafka.monitor.instrument(
    'connection.message_delegator.call',
    caller: self,
    consumer: consumer,
    kafka_message: kafka_message
  ) do
    # @note We always get a single message within single delegator, which means that
    # we don't care if user marked it as a batch consumed or not.
    consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages(
      [kafka_message],
      topic
    )
    consumer.call
  end
end