Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
- .encoder ⇒ Object
-
.generate_deletion_payload(record) ⇒ Hash
Deletion payload for a record by default, delegate to the model’s deletion_payload.
-
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record..
-
.poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller.
-
.post_process(_records) ⇒ Object
Post process records after publishing.
-
.record_class(klass = nil, refetch: true) ⇒ void
Indicate the class this producer is working on.
- .send_event(record, force_send: false) ⇒ void
- .send_events(records, force_send: false) ⇒ void
-
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates.
Methods inherited from Producer
determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic
Class Method Details
.encoder ⇒ Object
57 58 59 60 61 |
# File 'lib/deimos/active_record_producer.rb', line 57 def encoder raise "No schema or namespace configured for #{self.name}" if karafka_config.nil? karafka_config.deserializers[:payload].backend end |
.generate_deletion_payload(record) ⇒ Hash
Deletion payload for a record by default, delegate to the model’s deletion_payload. Producers may override this method to customize the deletion key/payload per producer.
86 87 88 |
# File 'lib/deimos/active_record_producer.rb', line 86 def generate_deletion_payload(record) record.deletion_payload end |
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/deimos/active_record_producer.rb', line 69 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload if self.karafka_config.use_schema_classes.nil? && !Deimos.config.schema.use_schema_classes Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace) end |
.poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/deimos/active_record_producer.rb', line 98 def poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) klass = @record_class table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end |
.post_process(_records) ⇒ Object
Post process records after publishing
115 116 |
# File 'lib/deimos/active_record_producer.rb', line 115 def post_process(_records) end |
.record_class(klass = nil, refetch: true) ⇒ void
This method returns an undefined value.
Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.
21 22 23 24 25 26 |
# File 'lib/deimos/active_record_producer.rb', line 21 def record_class(klass=nil, refetch: true) return @record_class if klass.nil? @record_class = klass @refetch_record = refetch end |
.send_event(record, force_send: false) ⇒ void
31 32 33 |
# File 'lib/deimos/active_record_producer.rb', line 31 def send_event(record, force_send: false) send_events([record], force_send: force_send) end |
.send_events(records, force_send: false) ⇒ void
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/deimos/active_record_producer.rb', line 38 def send_events(records, force_send: false) return if Deimos.producers_disabled?(self) primary_key = @record_class&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if @refetch_record && attrs[primary_key] record = @record_class.find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
122 123 124 |
# File 'lib/deimos/active_record_producer.rb', line 122 def watched_attributes(_record) self.encoder.schema_fields.map(&:name) end |