Class: Deimos::ActiveRecordProducer

Inherits:
Producer
  • Object
show all
Defined in:
lib/deimos/active_record_producer.rb

Overview

Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.

You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.

Constant Summary

Constants inherited from Producer

Producer::MAX_BATCH_SIZE

Class Method Summary collapse

Methods inherited from Producer

config, determine_backend_class, encoder, key_encoder, partition_key, produce_batch, publish, publish_list, topic, watched_attributes

Class Method Details

.generate_payload(attributes, _record) ⇒ Hash

Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.



55
56
57
58
59
60
61
62
63
64
# File 'lib/deimos/active_record_producer.rb', line 55

def generate_payload(attributes, _record)
  fields = self.encoder.schema_fields
  payload = attributes.stringify_keys
  payload.delete_if do |k, _|
    k.to_sym != :payload_key && !fields.map(&:name).include?(k)
  end
  return payload unless Utils::SchemaClass.use?(config.to_h)

  Utils::SchemaClass.instance(payload, config[:schema], config[:namespace])
end

.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation

Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/deimos/active_record_producer.rb', line 74

def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:)
  klass = config[:record_class]
  table = ActiveRecord::Base.connection.quote_table_name(klass.table_name)
  column = ActiveRecord::Base.connection.quote_column_name(column_name)
  primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key)
  klass.where(
    "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \
     OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?",
    time_from,
    min_id,
    time_from,
    time_to
  )
end

.record_class(klass, refetch: true) ⇒ Object

Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.



20
21
22
23
# File 'lib/deimos/active_record_producer.rb', line 20

def record_class(klass, refetch: true)
  config[:record_class] = klass
  config[:refetch_record] = refetch
end

.send_event(record, force_send: false) ⇒ Object



27
28
29
# File 'lib/deimos/active_record_producer.rb', line 27

def send_event(record, force_send: false)
  send_events([record], force_send: force_send)
end

.send_events(records, force_send: false) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/deimos/active_record_producer.rb', line 33

def send_events(records, force_send: false)
  primary_key = config[:record_class]&.primary_key
  messages = records.map do |record|
    if record.respond_to?(:attributes)
      attrs = record.attributes.with_indifferent_access
    else
      attrs = record.with_indifferent_access
      if config[:refetch_record] && attrs[primary_key]
        record = config[:record_class].find(attrs[primary_key])
      end
    end
    generate_payload(attrs, record).with_indifferent_access
  end
  self.publish_list(messages, force_send: force_send)
end