Class: Deimos::KafkaMessage

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/deimos/kafka_message.rb

Overview

Store Kafka messages into the database.

Instance Method Summary collapse

Instance Method Details

#decoded_message(decoder = self.decoder) ⇒ Hash

Decode the message. This assumes for now that we have access to a producer in the codebase which can decode it.

Parameters:

Returns:

  • (Hash)


31
32
33
34
35
36
37
38
# File 'lib/deimos/kafka_message.rb', line 31

def decoded_message(decoder=self.decoder)
  return { key: self.key, message: self.message } unless decoder

  {
    key: self.key.present? ? decoder.new.decode_key(self.key) : nil,
    payload: decoder.decoder.decode(self.message)
  }
end

#decoderDeimos::Consumer

Returns:



18
19
20
21
22
23
24
25
# File 'lib/deimos/kafka_message.rb', line 18

def decoder
  producer = Deimos::Producer.descendants.find { |c| c.topic == self.topic }
  return nil unless producer

  consumer = Class.new(Deimos::Consumer)
  consumer.config.merge!(producer.config)
  consumer
end

#message=(mess) ⇒ Object

Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.

Parameters:

  • mess (Object)


13
14
15
# File 'lib/deimos/kafka_message.rb', line 13

def message=(mess)
  write_attribute(:message, mess ? mess.to_s : nil)
end

#phobos_messageHash

Returns:

  • (Hash)


41
42
43
44
45
46
47
48
# File 'lib/deimos/kafka_message.rb', line 41

def phobos_message
  {
    payload: self.message,
    partition_key: self.partition_key,
    key: self.key,
    topic: self.topic
  }
end