Module: Deimos::KafkaSource
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/deimos/kafka_source.rb
Overview
Represents an object which needs to inform Kafka when it is saved or bulk imported.
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
-
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
-
#send_kafka_event_on_create ⇒ void
Send the newly created model to Kafka.
-
#send_kafka_event_on_destroy ⇒ void
Send a deletion (null payload) event to Kafka.
-
#send_kafka_event_on_update ⇒ void
Send the newly updated model to Kafka.
-
#truncate_columns ⇒ Object
check if any field has value longer than the field limit.
Instance Method Details
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
54 55 56 |
# File 'lib/deimos/kafka_source.rb', line 54 def deletion_payload { payload_key: self[self.class.primary_key] } end |
#send_kafka_event_on_create ⇒ void
This method returns an undefined value.
Send the newly created model to Kafka.
17 18 19 20 21 22 23 |
# File 'lib/deimos/kafka_source.rb', line 17 def send_kafka_event_on_create return unless self.persisted? return unless self.class.kafka_config[:create] self.truncate_columns if Deimos.config.producers.truncate_columns self.class.kafka_producers.each { |p| p.send_event(self) } end |
#send_kafka_event_on_destroy ⇒ void
This method returns an undefined value.
Send a deletion (null payload) event to Kafka.
46 47 48 49 50 |
# File 'lib/deimos/kafka_source.rb', line 46 def send_kafka_event_on_destroy return unless self.class.kafka_config[:delete] self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) } end |
#send_kafka_event_on_update ⇒ void
This method returns an undefined value.
Send the newly updated model to Kafka.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/deimos/kafka_source.rb', line 27 def send_kafka_event_on_update return unless self.class.kafka_config[:update] producers = self.class.kafka_producers fields = producers.flat_map { |p| p.watched_attributes(self) }.uniq fields -= ['updated_at'] # Only send an event if a field we care about was changed. any_changes = fields.any? do |field| field_change = self.previous_changes[field] field_change.present? && field_change[0] != field_change[1] end return unless any_changes self.truncate_columns if Deimos.config.producers.truncate_columns producers.each { |p| p.send_event(self) } end |
#truncate_columns ⇒ Object
check if any field has value longer than the field limit
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/deimos/kafka_source.rb', line 127 def truncate_columns self.class.columns.each do |col| next unless col.type == :string next if self[col.name].blank? if self[col.name].to_s.length > col.limit self[col.name] = self[col.name][0..col.limit - 1] end end false end |