Class: TableSync::Receiving::Model::ActiveRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/table_sync/receiving/model/active_record.rb

Defined Under Namespace

Classes: AfterCommitWrap

Constant Summary collapse

ISOLATION_LEVELS =
{
  uncommitted: :read_uncommitted,
  committed: :read_committed,
  repeatable: :repeatable_read,
  serializable: :serializable,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table_name) ⇒ ActiveRecord

Returns a new instance of ActiveRecord.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/table_sync/receiving/model/active_record.rb', line 30

def initialize(table_name)
  @raw_model = Class.new(::ActiveRecord::Base) do
    self.table_name = table_name
    self.inheritance_column = nil
  end
  @types_validator = TableSync::Utils::Schema::Builder::ActiveRecord.build(@raw_model)

  model_naming = ::TableSync::NamingResolver::ActiveRecord.new(table_name:)

  @table = model_naming.table.to_sym
  @schema = model_naming.schema.to_sym
end

Instance Attribute Details

#schemaObject (readonly)

Returns the value of attribute schema.



28
29
30
# File 'lib/table_sync/receiving/model/active_record.rb', line 28

def schema
  @schema
end

#tableObject (readonly)

Returns the value of attribute table.



28
29
30
# File 'lib/table_sync/receiving/model/active_record.rb', line 28

def table
  @table
end

Instance Method Details

#after_commitObject



128
129
130
# File 'lib/table_sync/receiving/model/active_record.rb', line 128

def after_commit(&)
  db.add_transaction_record(AfterCommitWrap.new(&))
end

#columnsObject



47
48
49
# File 'lib/table_sync/receiving/model/active_record.rb', line 47

def columns
  raw_model.column_names.map(&:to_sym)
end

#destroy(data:, target_keys:, version_key:) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/table_sync/receiving/model/active_record.rb', line 99

def destroy(data:, target_keys:, version_key:)
  sanitized_data = data.map { |attr| attr.slice(*target_keys) }

  query = nil
  sanitized_data.each_with_index do |row, index|
    if index == 0
      query = raw_model.lock("FOR UPDATE").where(row)
    else
      query = query.or(raw_model.lock("FOR UPDATE").where(row))
    end
  end

  result = query.destroy_all.map { |x| row_to_hash(x) }

  if result.size > data.size
    raise TableSync::DestroyError.new(data:, target_keys:, result:)
  end

  result
end

#find_and_save(keys:) {|entry| ... } ⇒ Object

Yields:

  • (entry)


140
141
142
143
144
145
146
# File 'lib/table_sync/receiving/model/active_record.rb', line 140

def find_and_save(keys:)
  entry = raw_model.find_by(keys)
  return unless entry

  yield entry
  entry.save!
end

#isolation_level(lookup_code) ⇒ Object



43
44
45
# File 'lib/table_sync/receiving/model/active_record.rb', line 43

def isolation_level(lookup_code)
  ISOLATION_LEVELS.fetch(lookup_code)
end

#primary_keysObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/table_sync/receiving/model/active_record.rb', line 51

def primary_keys
  db.execute("    SELECT kcu.column_name\n    FROM INFORMATION_SCHEMA.TABLES t\n    LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc\n      ON tc.table_catalog = t.table_catalog\n      AND tc.table_schema = t.table_schema\n      AND tc.table_name = t.table_name\n      AND tc.constraint_type = 'PRIMARY KEY'\n    LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu\n      ON kcu.table_catalog = tc.table_catalog\n      AND kcu.table_schema = tc.table_schema\n      AND kcu.table_name = tc.table_name\n      AND kcu.constraint_name = tc.constraint_name\n    WHERE\n      t.table_schema NOT IN ('pg_catalog', 'information_schema')\n      AND t.table_schema = '\#{schema}'\n      AND t.table_name = '\#{table}'\n    ORDER BY\n      kcu.ordinal_position\n  SQL\nend\n").column_values(0).map(&:to_sym)

#transaction(**params) ⇒ Object



124
125
126
# File 'lib/table_sync/receiving/model/active_record.rb', line 124

def transaction(**params, &)
  ::ActiveRecord::Base.transaction(**params, &)
end

#try_advisory_lock(lock_key) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/table_sync/receiving/model/active_record.rb', line 132

def try_advisory_lock(lock_key)
  transaction do
    if db.query_value("SELECT pg_try_advisory_xact_lock(#{lock_key.to_i})")
      yield
    end
  end
end

#upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/table_sync/receiving/model/active_record.rb', line 74

def upsert(data:, target_keys:, version_key:, default_values:)
  data.filter_map do |datum|
    conditions = datum.select { |k| target_keys.include?(k) }

    row = raw_model.lock("FOR NO KEY UPDATE").where(conditions)

    if row.to_a.size > 1
      raise TableSync::UpsertError.new(data: datum, target_keys:, result: row)
    end

    row = row.first

    if row
      next if datum[version_key] <= row[version_key]

      row.update!(datum)
    else
      create_data = default_values.merge(datum)
      row = raw_model.create!(create_data)
    end

    row_to_hash(row)
  end
end

#validate_types(data) ⇒ Object



120
121
122
# File 'lib/table_sync/receiving/model/active_record.rb', line 120

def validate_types(data)
  types_validator.validate(data)
end