Module: Deimos::KafkaSource

Extended by:
ActiveSupport::Concern
Defined in:
lib/deimos/kafka_source.rb

Overview

Represents an object which needs to inform Kafka when it is saved or bulk imported.

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

DEPRECATION_WARNING =
'The kafka_producer interface will be deprecated ' \
'in future releases. Please use kafka_producers instead.'

Instance Method Summary collapse

Instance Method Details

#deletion_payloadHash

Payload to send after we are destroyed.

Returns:

  • (Hash)


52
53
54
# File 'lib/deimos/kafka_source.rb', line 52

def deletion_payload
  { payload_key: self[self.class.primary_key] }
end

#send_kafka_event_on_createObject

Send the newly created model to Kafka.



19
20
21
22
23
24
# File 'lib/deimos/kafka_source.rb', line 19

def send_kafka_event_on_create
  return unless self.persisted?
  return unless self.class.kafka_config[:create]

  self.class.kafka_producers.each { |p| p.send_event(self) }
end

#send_kafka_event_on_destroyObject

Send a deletion (null payload) event to Kafka.



44
45
46
47
48
# File 'lib/deimos/kafka_source.rb', line 44

def send_kafka_event_on_destroy
  return unless self.class.kafka_config[:delete]

  self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) }
end

#send_kafka_event_on_updateObject

Send the newly updated model to Kafka.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/deimos/kafka_source.rb', line 27

def send_kafka_event_on_update
  return unless self.class.kafka_config[:update]

  producers = self.class.kafka_producers
  fields = producers.flat_map(&:watched_attributes).uniq
  fields -= ['updated_at']
  # Only send an event if a field we care about was changed.
  any_changes = fields.any? do |field|
    field_change = self.previous_changes[field]
    field_change.present? && field_change[0] != field_change[1]
  end
  return unless any_changes

  producers.each { |p| p.send_event(self) }
end