Module: Deimos::KafkaSource

Extended by:
ActiveSupport::Concern
Defined in:
lib/deimos/kafka_source.rb

Overview

Represents an object which needs to inform Kafka when it is saved or bulk imported.

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#deletion_payloadHash

Payload to send after we are destroyed.

Returns:

  • (Hash)


49
50
51
# File 'lib/deimos/kafka_source.rb', line 49

def deletion_payload
  { payload_key: self[self.class.primary_key] }
end

#send_kafka_event_on_createObject

Send the newly created model to Kafka.



16
17
18
19
20
21
# File 'lib/deimos/kafka_source.rb', line 16

def send_kafka_event_on_create
  return unless self.persisted?
  return unless self.class.kafka_config[:create]

  self.class.kafka_producers.each { |p| p.send_event(self) }
end

#send_kafka_event_on_destroyObject

Send a deletion (null payload) event to Kafka.



41
42
43
44
45
# File 'lib/deimos/kafka_source.rb', line 41

def send_kafka_event_on_destroy
  return unless self.class.kafka_config[:delete]

  self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) }
end

#send_kafka_event_on_updateObject

Send the newly updated model to Kafka.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/deimos/kafka_source.rb', line 24

def send_kafka_event_on_update
  return unless self.class.kafka_config[:update]

  producers = self.class.kafka_producers
  fields = producers.flat_map(&:watched_attributes).uniq
  fields -= ['updated_at']
  # Only send an event if a field we care about was changed.
  any_changes = fields.any? do |field|
    field_change = self.previous_changes[field]
    field_change.present? && field_change[0] != field_change[1]
  end
  return unless any_changes

  producers.each { |p| p.send_event(self) }
end