Module: Karafka::Connection::Processor
- Defined in:
- lib/karafka/connection/processor.rb
Overview
Class that consumes messages for which we listen
Class Method Summary collapse
-
.process(group_id, kafka_messages) ⇒ Object
Processes messages (does something with them) It will either schedule or run a proper controller action for messages.
Class Method Details
.process(group_id, kafka_messages) ⇒ Object
Note:
This should be looped to obtain a constant listening
Note:
We catch all the errors here, to make sure that none failures for a given consumption will affect other consumed messages If we wouldn’t catch it, it would propagate up until killing the thread
Processes messages (does something with them) It will either schedule or run a proper controller action for messages
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/karafka/connection/processor.rb', line 16 def process(group_id, ) # @note We always get messages by topic and partition so we can take topic from the # first one and it will be valid for all the messages # We map from incoming topic name, as it might be namespaced, etc. # @see topic_mapper internal docs mapped_topic_name = Karafka::App.config.topic_mapper.incoming([0].topic) topic = Routing::Router.find("#{group_id}_#{mapped_topic_name}") controller = Persistence::Controller.fetch(topic, [0].partition) do topic.controller.new end # Depending on a case (persisted or not) we might use new controller instance per each # batch, or use the same instance for all of them (for implementing buffering, etc) send( topic.batch_consuming ? :process_batch : :process_each, controller, ) end |