Class: Deimos::KafkaMessage
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Deimos::KafkaMessage
- Defined in:
- lib/deimos/kafka_message.rb
Overview
Store Kafka messages into the database.
Instance Method Summary collapse
-
#decoded_message(decoder = self.decoder) ⇒ Hash
Decode the message.
- #decoder ⇒ Deimos::Consumer
-
#message=(mess) ⇒ Object
Ensure it gets turned into a string, e.g.
- #phobos_message ⇒ Hash
Instance Method Details
#decoded_message(decoder = self.decoder) ⇒ Hash
Decode the message. This assumes for now that we have access to a producer in the codebase which can decode it.
31 32 33 34 35 36 37 38 |
# File 'lib/deimos/kafka_message.rb', line 31 def (decoder=self.decoder) return { key: self.key, message: self. } unless decoder { key: self.key.present? ? decoder.new.decode_key(self.key) : nil, payload: decoder.decoder.decode(self.) } end |
#decoder ⇒ Deimos::Consumer
18 19 20 21 22 23 24 25 |
# File 'lib/deimos/kafka_message.rb', line 18 def decoder producer = Deimos::Producer.descendants.find { |c| c.topic == self.topic } return nil unless producer consumer = Class.new(Deimos::Consumer) consumer.config.merge!(producer.config) consumer end |
#message=(mess) ⇒ Object
Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.
13 14 15 |
# File 'lib/deimos/kafka_message.rb', line 13 def (mess) write_attribute(:message, mess ? mess.to_s : nil) end |
#phobos_message ⇒ Hash
41 42 43 44 45 46 47 48 |
# File 'lib/deimos/kafka_message.rb', line 41 def { payload: self., partition_key: self.partition_key, key: self.key, topic: self.topic } end |