Module: Karafka::Params::Dsl::ClassMethods
- Defined in:
- lib/karafka/params/dsl.rb
Overview
Class methods required by params to work
Instance Method Summary collapse
-
#build(message, parser) ⇒ Karafka::Params::Params
Karafka params object not yet used parser for retrieving data that we’ve got from Kafka.
Instance Method Details
#build(message, parser) ⇒ Karafka::Params::Params
Returns Karafka params object not yet used parser for retrieving data that we’ve got from Kafka.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/params/dsl.rb', line 60 def build(, parser) instance = new instance['parser'] = parser # Non kafka fetched message can happen when we interchange data with an # additional backend if .is_a?(Kafka::FetchedMessage) instance.send( :merge!, 'value' => .value, 'partition' => .partition, 'offset' => .offset, 'key' => .key, 'create_time' => .create_time, 'receive_time' => Time.now, # When we get raw messages, they might have a topic, that was modified by a # topic mapper. We need to "reverse" this change and map back to the non-modified # format, so our internal flow is not corrupted with the mapping 'topic' => Karafka::App.config.topic_mapper.incoming(.topic) ) else instance.send(:merge!, ) end instance end |