Class: Deimos::ActiveRecordConsumer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/deimos/active_record_consumer.rb

Overview

Consumer that automatically saves the payload into the database.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Consumer

#around_consume, #before_consume, #decode_key, decoder, key_decoder

Class Method Details

.record_class(klass) ⇒ Object

param klass [Class < ActiveRecord::Base] the class used to save to the database.



11
12
13
# File 'lib/deimos/active_record_consumer.rb', line 11

def record_class(klass)
  config[:record_class] = klass
end

Instance Method Details

#consume(payload, metadata) ⇒ Object

:nodoc:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/deimos/active_record_consumer.rb', line 17

def consume(payload, )
  key = .with_indifferent_access[:key]
  klass = self.class.config[:record_class]
  record = klass.where(klass.primary_key => key).first
  if payload.nil?
    destroy_record(record)
    return
  end
  record ||= klass.new
  attrs = record_attributes(payload.with_indifferent_access)
  # don't use attributes= - bypass Rails < 5 attr_protected
  attrs.each do |k, v|
    record.send("#{k}=", v)
  end
  record[klass.primary_key] = key
  record.created_at ||= Time.zone.now if record.respond_to?(:created_at)
  record.updated_at ||= Time.zone.now if record.respond_to?(:updated_at)
  record.save!
end

#destroy_record(record) ⇒ Object

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).

Parameters:

  • record (ActiveRecord::Base)


40
41
42
# File 'lib/deimos/active_record_consumer.rb', line 40

def destroy_record(record)
  record&.destroy
end

#record_attributes(payload) ⇒ Object

Override this method (with ‘super`) if you want to add/change the default attributes set to the new/existing record.

Parameters:

  • payload (Hash)


47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/deimos/active_record_consumer.rb', line 47

def record_attributes(payload)
  klass = self.class.config[:record_class]
  attributes = {}
  schema = self.class.decoder.avro_schema
  schema.fields.each do |field|
    column = klass.columns.find { |c| c.name == field.name }
    next if column.nil?
    next if %w(updated_at created_at).include?(field.name)

    attributes[field.name] = _coerce_field(field, column, payload[field.name])
  end
  attributes
end