Class: TableSync::Receiving::Model::ActiveRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/table_sync/receiving/model/active_record.rb

Defined Under Namespace

Classes: AfterCommitWrap

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table_name) ⇒ ActiveRecord

Returns a new instance of ActiveRecord.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/table_sync/receiving/model/active_record.rb', line 23

def initialize(table_name)
  @raw_model = Class.new(::ActiveRecord::Base) do
    self.table_name = table_name
    self.inheritance_column = nil
  end

  model_naming = ::TableSync::NamingResolver::ActiveRecord.new(table_name: table_name)

  @table = model_naming.table.to_sym
  @schema = model_naming.schema.to_sym
end

Instance Attribute Details

#schemaObject (readonly)

Returns the value of attribute schema.



21
22
23
# File 'lib/table_sync/receiving/model/active_record.rb', line 21

def schema
  @schema
end

#tableObject (readonly)

Returns the value of attribute table.



21
22
23
# File 'lib/table_sync/receiving/model/active_record.rb', line 21

def table
  @table
end

Instance Method Details

#after_commit(&block) ⇒ Object



112
113
114
# File 'lib/table_sync/receiving/model/active_record.rb', line 112

def after_commit(&block)
  db.add_transaction_record(AfterCommitWrap.new(&block))
end

#columnsObject



35
36
37
# File 'lib/table_sync/receiving/model/active_record.rb', line 35

def columns
  raw_model.column_names.map(&:to_sym)
end

#destroy(data:, target_keys:, version_key:) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/table_sync/receiving/model/active_record.rb', line 87

def destroy(data:, target_keys:, version_key:)
  sanitized_data = data.map { |attr| attr.select { |key, _value| target_keys.include?(key) } }

  query = nil
  sanitized_data.each_with_index do |row, index|
    if index == 0
      query = raw_model.lock("FOR UPDATE").where(row)
    else
      query = query.or(raw_model.lock("FOR UPDATE").where(row))
    end
  end

  result = query.destroy_all.map { |x| row_to_hash(x) }

  if result.size > data.size
    raise TableSync::DestroyError.new(data: data, target_keys: target_keys, result: result)
  end

  result
end

#primary_keysObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/table_sync/receiving/model/active_record.rb', line 39

def primary_keys
  db.execute(<<~SQL).column_values(0).map(&:to_sym)
    SELECT kcu.column_name
    FROM INFORMATION_SCHEMA.TABLES t
    LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
      ON tc.table_catalog = t.table_catalog
      AND tc.table_schema = t.table_schema
      AND tc.table_name = t.table_name
      AND tc.constraint_type = 'PRIMARY KEY'
    LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
      ON kcu.table_catalog = tc.table_catalog
      AND kcu.table_schema = tc.table_schema
      AND kcu.table_name = tc.table_name
      AND kcu.constraint_name = tc.constraint_name
    WHERE
      t.table_schema NOT IN ('pg_catalog', 'information_schema')
      AND t.table_schema = '#{schema}'
      AND t.table_name = '#{table}'
    ORDER BY
      kcu.ordinal_position
  SQL
end

#transaction(&block) ⇒ Object



108
109
110
# File 'lib/table_sync/receiving/model/active_record.rb', line 108

def transaction(&block)
  ::ActiveRecord::Base.transaction(&block)
end

#upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/table_sync/receiving/model/active_record.rb', line 62

def upsert(data:, target_keys:, version_key:, default_values:)
  data.filter_map do |datum|
    conditions = datum.select { |k| target_keys.include?(k) }

    row = raw_model.lock("FOR NO KEY UPDATE").where(conditions)

    if row.to_a.size > 1
      raise TableSync::UpsertError.new(data: datum, target_keys: target_keys, result: row)
    end

    row = row.first

    if row
      next if datum[version_key] <= row[version_key]

      row.update!(datum)
    else
      create_data = default_values.merge(datum)
      row = raw_model.create!(create_data)
    end

    row_to_hash(row)
  end
end