Module: Karafka::Connection::Delegator
- Defined in:
- lib/karafka/connection/delegator.rb
Overview
Class that delegates processing of messages for which we listen to a proper processor
Class Method Summary collapse
-
.call(group_id, kafka_messages) ⇒ Object
Delegates messages (does something with them) It will either schedule or run a proper processor action for messages.
Class Method Details
.call(group_id, kafka_messages) ⇒ Object
Note:
This should be looped to obtain a constant delegating of new messages
Note:
We catch all the errors here, to make sure that none failures for a given consumption will affect other consumed messages If we wouldn’t catch it, it would propagate up until killing the thread
Note:
It is a one huge method, because of performance reasons. It is much faster then using send or invoking additional methods
Delegates messages (does something with them) It will either schedule or run a proper processor action for messages
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/karafka/connection/delegator.rb', line 18 def call(group_id, ) # @note We always get messages by topic and partition so we can take topic from the # first one and it will be valid for all the messages topic = Persistence::Topic.fetch(group_id, [0].topic) consumer = Persistence::Consumer.fetch(topic, [0].partition) Karafka.monitor.instrument( 'connection.delegator.call', caller: self, consumer: consumer, kafka_messages: ) do # Depending on a case (persisted or not) we might use new consumer instance per # each batch, or use the same one for all of them (for implementing buffering, etc.) if topic.batch_consuming consumer.params_batch = consumer.call else .each do || consumer.params_batch = [] consumer.call end end end end |