Module: Deimos::KafkaSource

Extended by:
ActiveSupport::Concern
Defined in:
lib/deimos/kafka_source.rb

Overview

Represents an object which needs to inform Kafka when it is saved or bulk imported.

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#deletion_payloadHash

Payload to send after we are destroyed.

Returns:

  • (Hash)


62
63
64
# File 'lib/deimos/kafka_source.rb', line 62

def deletion_payload
  { payload_key: self[self.class.primary_key] }
end

#send_kafka_event_on_createvoid

This method returns an undefined value.

Send the newly created model to Kafka.



17
18
19
20
21
22
23
# File 'lib/deimos/kafka_source.rb', line 17

def send_kafka_event_on_create
  return unless self.persisted?
  return unless self.class.kafka_config[:create]

  self.truncate_columns if Deimos.config.producers.truncate_columns
  self.class.kafka_producers.each { |p| p.send_event(self) }
end

#send_kafka_event_on_destroyvoid

This method returns an undefined value.

Send a deletion (null payload) event to Kafka.



49
50
51
52
53
54
55
56
57
58
# File 'lib/deimos/kafka_source.rb', line 49

def send_kafka_event_on_destroy
  return unless self.class.kafka_config[:delete]

  self.class.kafka_producers.each do |p|
    generated = p.respond_to?(:generate_deletion_payload) ?
                  p.generate_deletion_payload(self) :
                  self.deletion_payload
    p.publish_list([generated])
  end
end

#send_kafka_event_on_updatevoid

This method returns an undefined value.

Send the newly updated model to Kafka.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/deimos/kafka_source.rb', line 27

def send_kafka_event_on_update
  return unless self.class.kafka_config[:update]

  self.truncate_columns if Deimos.config.producers.truncate_columns

  producers = self.class.kafka_producers
  producers.each do |producer|
    fields = producer.watched_attributes(self)
    fields -= ['updated_at']
    # Only send an event if a field we care about was changed.
    any_changes = fields.any? do |field|
      field_change = self.previous_changes[field]
      field_change.present? && field_change[0] != field_change[1]
    end
    next unless any_changes

    producer.send_event(self)
  end
end

#truncate_columnsObject

check if any field has value longer than the field limit



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/deimos/kafka_source.rb', line 136

def truncate_columns
  self.class.columns.each do |col|
    next unless col.type == :string
    next if self[col.name].blank?

    if self[col.name].to_s.length > col.limit
      self[col.name] = self[col.name][0..col.limit - 1]
    end
  end
  false
end