Class: Deimos::ActiveRecordProducer

Inherits:
Producer
  • Object
show all
Defined in:
lib/deimos/active_record_producer.rb

Overview

Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.

You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.

Constant Summary

Constants inherited from Producer

Producer::MAX_BATCH_SIZE

Class Method Summary collapse

Methods inherited from Producer

determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic

Class Method Details

.encoderObject



57
58
59
60
61
# File 'lib/deimos/active_record_producer.rb', line 57

def encoder
  raise "No schema or namespace configured for #{self.name}" if karafka_config.nil?

  karafka_config.deserializers[:payload].backend
end

.generate_deletion_payload(record) ⇒ Hash

Deletion payload for a record by default, delegate to the model’s deletion_payload. Producers may override this method to customize the deletion key/payload per producer.



86
87
88
# File 'lib/deimos/active_record_producer.rb', line 86

def generate_deletion_payload(record)
  record.deletion_payload
end

.generate_payload(attributes, _record) ⇒ Hash

Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/deimos/active_record_producer.rb', line 69

def generate_payload(attributes, _record)
  fields = self.encoder.schema_fields
  payload = attributes.stringify_keys
  payload.delete_if do |k, _|
    k.to_sym != :payload_key && !fields.map(&:name).include?(k)
  end
  return payload if self.karafka_config.use_schema_classes.nil? &&
                    !Deimos.config.schema.use_schema_classes

  Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
end

.poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) ⇒ ActiveRecord::Relation

Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/deimos/active_record_producer.rb', line 98

def poll_query(time_from:, time_to:, min_id:, column_name: :updated_at)
  klass = @record_class
  table = ActiveRecord::Base.connection.quote_table_name(klass.table_name)
  column = ActiveRecord::Base.connection.quote_column_name(column_name)
  primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key)
  klass.where(
    "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \
     OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?",
    time_from,
    min_id,
    time_from,
    time_to
  )
end

.post_process(_records) ⇒ Object

Post process records after publishing



115
116
# File 'lib/deimos/active_record_producer.rb', line 115

def post_process(_records)
end

.record_class(klass = nil, refetch: true) ⇒ void

This method returns an undefined value.

Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.



21
22
23
24
25
26
# File 'lib/deimos/active_record_producer.rb', line 21

def record_class(klass=nil, refetch: true)
  return @record_class if klass.nil?

  @record_class = klass
  @refetch_record = refetch
end

.send_event(record, force_send: false) ⇒ void



31
32
33
# File 'lib/deimos/active_record_producer.rb', line 31

def send_event(record, force_send: false)
  send_events([record], force_send: force_send)
end

.send_events(records, force_send: false) ⇒ void



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/deimos/active_record_producer.rb', line 38

def send_events(records, force_send: false)
  return if Deimos.producers_disabled?(self)

  primary_key = @record_class&.primary_key
  messages = records.map do |record|
    if record.respond_to?(:attributes)
      attrs = record.attributes.with_indifferent_access
    else
      attrs = record.with_indifferent_access
      if @refetch_record && attrs[primary_key]
        record = @record_class.find(attrs[primary_key])
      end
    end
    generate_payload(attrs, record).with_indifferent_access
  end
  self.publish_list(messages, force_send: force_send)
  self.post_process(records)
end

.watched_attributes(_record) ⇒ Array<String>

Override this in active record producers to add non-schema fields to check for updates



122
123
124
# File 'lib/deimos/active_record_producer.rb', line 122

def watched_attributes(_record)
  self.encoder.schema_fields.map(&:name)
end