Module: Karafka::Connection::MessageDelegator
- Defined in:
- lib/karafka/connection/message_delegator.rb
Overview
Class that delegates processing of a single received message for which we listen to a proper processor
Class Method Summary collapse
-
.call(group_id, kafka_message) ⇒ Object
Delegates message (does something with it) It will either schedule or run a proper processor action for the incoming message.
Class Method Details
.call(group_id, kafka_message) ⇒ Object
Note:
This should be looped to obtain a constant delegating of new messages
Delegates message (does something with it) It will either schedule or run a proper processor action for the incoming message
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/karafka/connection/message_delegator.rb', line 14 def call(group_id, ) topic = Persistence::Topics.fetch(group_id, .topic) consumer = Persistence::Consumers.fetch(topic, .partition) Karafka.monitor.instrument( 'connection.message_delegator.call', caller: self, consumer: consumer, kafka_message: ) do # @note We always get a single message within single delegator, which means that # we don't care if user marked it as a batch consumed or not. consumer.params_batch = Params::Builders::ParamsBatch.( [], topic ) consumer.call end end |