Module: Deimos::KafkaSource
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/deimos/kafka_source.rb
Overview
Represents an object which needs to inform Kafka when it is saved or bulk imported.
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
-
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
-
#send_kafka_event_on_create ⇒ Object
Send the newly created model to Kafka.
-
#send_kafka_event_on_destroy ⇒ Object
Send a deletion (null payload) event to Kafka.
-
#send_kafka_event_on_update ⇒ Object
Send the newly updated model to Kafka.
Instance Method Details
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
49 50 51 |
# File 'lib/deimos/kafka_source.rb', line 49 def deletion_payload { payload_key: self[self.class.primary_key] } end |
#send_kafka_event_on_create ⇒ Object
Send the newly created model to Kafka.
16 17 18 19 20 21 |
# File 'lib/deimos/kafka_source.rb', line 16 def send_kafka_event_on_create return unless self.persisted? return unless self.class.kafka_config[:create] self.class.kafka_producers.each { |p| p.send_event(self) } end |
#send_kafka_event_on_destroy ⇒ Object
Send a deletion (null payload) event to Kafka.
41 42 43 44 45 |
# File 'lib/deimos/kafka_source.rb', line 41 def send_kafka_event_on_destroy return unless self.class.kafka_config[:delete] self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) } end |
#send_kafka_event_on_update ⇒ Object
Send the newly updated model to Kafka.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/deimos/kafka_source.rb', line 24 def send_kafka_event_on_update return unless self.class.kafka_config[:update] producers = self.class.kafka_producers fields = producers.flat_map(&:watched_attributes).uniq fields -= ['updated_at'] # Only send an event if a field we care about was changed. any_changes = fields.any? do |field| field_change = self.previous_changes[field] field_change.present? && field_change[0] != field_change[1] end return unless any_changes producers.each { |p| p.send_event(self) } end |