Module: Deimos::ActiveRecordConsume::MessageConsumption

Includes:
Consume::MessageConsumption
Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/message_consumption.rb

Overview

Methods for consuming individual messages and saving them to the database as ActiveRecord instances.

Instance Method Summary collapse

Instance Method Details

#assign_key(record, _payload, key) ⇒ void

This method returns an undefined value.

Assign a key to a new record.



28
29
30
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 28

def assign_key(record, _payload, key)
  record[record.class.primary_key] = key
end

#consume_message(message) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 33

def consume_message(message)
  unless self.process_message?(message)
    Deimos::Logging.log_debug(
        message: 'Skipping processing of message',
        payload: message.payload.to_h,
        metadata: Deimos::Logging.(message.)
      )
    return
  end

  klass = self.class.config[:record_class]
  record = fetch_record(klass, message.payload.to_h.with_indifferent_access, message.key)
  if message.payload.nil?
    destroy_record(record)
    return
  end
  if record.blank?
    record = klass.new
    assign_key(record, message.payload, message.key)
  end

  attrs = record_attributes((message.payload || {}).with_indifferent_access, message.key)
  # don't use attributes= - bypass Rails < 5 attr_protected
  attrs.each do |k, v|
    record.send("#{k}=", v)
  end
  save_record(record)
end

#destroy_record(record) ⇒ void

This method returns an undefined value.

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).



74
75
76
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 74

def destroy_record(record)
  record&.destroy
end

#fetch_record(klass, _payload, key) ⇒ ActiveRecord::Base

Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.



18
19
20
21
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 18

def fetch_record(klass, _payload, key)
  fetch_key = key.is_a?(Hash) && key.size == 1 ? key.values.first : key
  klass.unscoped.where(klass.primary_key => fetch_key).first
end

#save_record(record) ⇒ void



64
65
66
67
68
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 64

def save_record(record)
  record.created_at ||= Time.zone.now if record.respond_to?(:created_at)
  record.updated_at = Time.zone.now if record.respond_to?(:updated_at)
  record.save!
end