Module: Deimos::KafkaSource

Extended by:
ActiveSupport::Concern
Defined in:
lib/deimos/kafka_source.rb

Overview

Represents an object which needs to inform Kafka when it is saved or bulk imported.

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#deletion_payloadHash

Payload to send after we are destroyed.

Returns:

  • (Hash)


54
55
56
# File 'lib/deimos/kafka_source.rb', line 54

def deletion_payload
  { payload_key: self[self.class.primary_key] }
end

#send_kafka_event_on_createvoid

This method returns an undefined value.

Send the newly created model to Kafka.



17
18
19
20
21
22
23
# File 'lib/deimos/kafka_source.rb', line 17

def send_kafka_event_on_create
  return unless self.persisted?
  return unless self.class.kafka_config[:create]

  self.truncate_columns if Deimos.config.producers.truncate_columns
  self.class.kafka_producers.each { |p| p.send_event(self) }
end

#send_kafka_event_on_destroyvoid

This method returns an undefined value.

Send a deletion (null payload) event to Kafka.



46
47
48
49
50
# File 'lib/deimos/kafka_source.rb', line 46

def send_kafka_event_on_destroy
  return unless self.class.kafka_config[:delete]

  self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) }
end

#send_kafka_event_on_updatevoid

This method returns an undefined value.

Send the newly updated model to Kafka.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/deimos/kafka_source.rb', line 27

def send_kafka_event_on_update
  return unless self.class.kafka_config[:update]

  producers = self.class.kafka_producers
  fields = producers.flat_map { |p| p.watched_attributes(self) }.uniq
  fields -= ['updated_at']
  # Only send an event if a field we care about was changed.
  any_changes = fields.any? do |field|
    field_change = self.previous_changes[field]
    field_change.present? && field_change[0] != field_change[1]
  end
  return unless any_changes
  self.truncate_columns if Deimos.config.producers.truncate_columns

  producers.each { |p| p.send_event(self) }
end

#truncate_columnsObject

check if any field has value longer than the field limit



127
128
129
130
131
132
133
134
135
136
# File 'lib/deimos/kafka_source.rb', line 127

def truncate_columns
  self.class.columns.each do |col|
    next unless col.type == :string
    next if self[col.name].blank?
    if self[col.name].to_s.length > col.limit
      self[col.name] = self[col.name][0..col.limit - 1]
    end
  end
  false
end