Module: Deimos::ActiveRecordConsume::MessageConsumption

Includes:
Consume::MessageConsumption
Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/message_consumption.rb

Overview

Methods for consuming individual messages and saving them to the database as ActiveRecord instances.

Instance Method Summary collapse

Instance Method Details

#assign_key(record, _payload, key) ⇒ void

This method returns an undefined value.

Assign a key to a new record.



29
30
31
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 29

def assign_key(record, _payload, key)
  record[record.class.primary_key] = key
end

#consume_message(message) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 34

def consume_message(message)
  unless self.process_message?(message)
    Deimos::Logging.log_debug(
      message: 'Skipping processing of message',
      payload: message.payload.to_h,
      metadata: Deimos::Logging.(message.)
    )
    return
  end

  klass = self.class.config[:record_class]
  payload = message.payload
  if payload.is_a?(Hash) || payload.nil?
    payload = payload.to_h.with_indifferent_access
  end
  record = fetch_record(klass, payload, message.key)
  if delete_record?(message)
    destroy_record(record)
    return
  end
  if record.blank?
    record = klass.new
    assign_key(record, message.payload, message.key)
  end

  attrs = record_attributes((message.payload || {}).with_indifferent_access, message.key)
  # don't use attributes= - bypass Rails < 5 attr_protected
  attrs.each do |k, v|
    record.send("#{k}=", v)
  end
  save_record(record)
end

#destroy_record(record) ⇒ void

This method returns an undefined value.

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).



79
80
81
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 79

def destroy_record(record)
  record&.destroy
end

#fetch_record(klass, _payload, key) ⇒ ActiveRecord::Base

Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.



19
20
21
22
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 19

def fetch_record(klass, _payload, key)
  fetch_key = key.is_a?(Hash) && key.size == 1 ? key.values.first : key
  klass.unscoped.where(klass.primary_key => fetch_key).first
end

#save_record(record) ⇒ void



69
70
71
72
73
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 69

def save_record(record)
  record.created_at ||= Time.zone.now if record.respond_to?(:created_at)
  record.updated_at = Time.zone.now if record.respond_to?(:updated_at)
  record.save!
end