Module: Deimos::KafkaSource
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/deimos/kafka_source.rb
Overview
Represents an object which needs to inform Kafka when it is saved or bulk imported.
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
-
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
-
#send_kafka_event_on_create ⇒ void
Send the newly created model to Kafka.
-
#send_kafka_event_on_destroy ⇒ void
Send a deletion (null payload) event to Kafka.
-
#send_kafka_event_on_update ⇒ void
Send the newly updated model to Kafka.
-
#truncate_columns ⇒ Object
check if any field has value longer than the field limit.
Instance Method Details
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
62 63 64 |
# File 'lib/deimos/kafka_source.rb', line 62 def deletion_payload { payload_key: self[self.class.primary_key] } end |
#send_kafka_event_on_create ⇒ void
This method returns an undefined value.
Send the newly created model to Kafka.
17 18 19 20 21 22 23 |
# File 'lib/deimos/kafka_source.rb', line 17 def send_kafka_event_on_create return unless self.persisted? return unless self.class.kafka_config[:create] self.truncate_columns if Deimos.config.producers.truncate_columns self.class.kafka_producers.each { |p| p.send_event(self) } end |
#send_kafka_event_on_destroy ⇒ void
This method returns an undefined value.
Send a deletion (null payload) event to Kafka.
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/deimos/kafka_source.rb', line 49 def send_kafka_event_on_destroy return unless self.class.kafka_config[:delete] self.class.kafka_producers.each do |p| generated = p.respond_to?(:generate_deletion_payload) ? p.generate_deletion_payload(self) : self.deletion_payload p.publish_list([generated]) end end |
#send_kafka_event_on_update ⇒ void
This method returns an undefined value.
Send the newly updated model to Kafka.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/deimos/kafka_source.rb', line 27 def send_kafka_event_on_update return unless self.class.kafka_config[:update] self.truncate_columns if Deimos.config.producers.truncate_columns producers = self.class.kafka_producers producers.each do |producer| fields = producer.watched_attributes(self) fields -= ['updated_at'] # Only send an event if a field we care about was changed. any_changes = fields.any? do |field| field_change = self.previous_changes[field] field_change.present? && field_change[0] != field_change[1] end next unless any_changes producer.send_event(self) end end |
#truncate_columns ⇒ Object
check if any field has value longer than the field limit
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/deimos/kafka_source.rb', line 136 def truncate_columns self.class.columns.each do |col| next unless col.type == :string next if self[col.name].blank? if self[col.name].to_s.length > col.limit self[col.name] = self[col.name][0..col.limit - 1] end end false end |