Class: TableSync::Receiving::Model::ActiveRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/table_sync/receiving/model/active_record.rb

Defined Under Namespace

Classes: AfterCommitWrap

Instance Method Summary collapse

Constructor Details

#initialize(table_name) ⇒ ActiveRecord



21
22
23
24
25
26
# File 'lib/table_sync/receiving/model/active_record.rb', line 21

def initialize(table_name)
  @raw_model = Class.new(::ActiveRecord::Base) do
    self.table_name = table_name
    self.inheritance_column = nil
  end
end

Instance Method Details

#after_commit(&block) ⇒ Object



115
116
117
# File 'lib/table_sync/receiving/model/active_record.rb', line 115

def after_commit(&block)
  db.add_transaction_record(AfterCommitWrap.new(&block))
end

#columnsObject



28
29
30
# File 'lib/table_sync/receiving/model/active_record.rb', line 28

def columns
  raw_model.column_names.map(&:to_sym)
end

#destroy(data:, target_keys:, version_key:) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/table_sync/receiving/model/active_record.rb', line 85

def destroy(data:, target_keys:, version_key:)
  sanitized_data = data.map { |attr| attr.select { |key, _value| target_keys.include?(key) } }

  query = nil
  sanitized_data.each_with_index do |row, index|
    if index == 0
      query = raw_model.lock("FOR UPDATE").where(row)
    else
      query = query.or(raw_model.lock("FOR UPDATE").where(row))
    end
  end

  result = query.destroy_all.map(&method(:row_to_hash))

  if result.size > data.size
    raise TableSync::DestroyError.new(data: data, target_keys: target_keys, result: result)
  end

  TableSync::Instrument.notify(
    table: model_naming.table, schema: model_naming.schema,
    event: :destroy, count: result.count, direction: :receive
  )

  result
end

#primary_keysObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/table_sync/receiving/model/active_record.rb', line 32

def primary_keys
  db.execute("    SELECT kcu.column_name\n    FROM INFORMATION_SCHEMA.TABLES t\n    LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc\n      ON tc.table_catalog = t.table_catalog\n      AND tc.table_schema = t.table_schema\n      AND tc.table_name = t.table_name\n      AND tc.constraint_type = 'PRIMARY KEY'\n    LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu\n      ON kcu.table_catalog = tc.table_catalog\n      AND kcu.table_schema = tc.table_schema\n      AND kcu.table_name = tc.table_name\n      AND kcu.constraint_name = tc.constraint_name\n    WHERE\n      t.table_schema NOT IN ('pg_catalog', 'information_schema')\n      AND t.table_schema = '\#{model_naming.schema}'\n      AND t.table_name = '\#{model_naming.table}'\n    ORDER BY\n      kcu.ordinal_position\n  SQL\nend\n").column_values(0).map(&:to_sym)

#transaction(&block) ⇒ Object



111
112
113
# File 'lib/table_sync/receiving/model/active_record.rb', line 111

def transaction(&block)
  ::ActiveRecord::Base.transaction(&block)
end

#upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/table_sync/receiving/model/active_record.rb', line 55

def upsert(data:, target_keys:, version_key:, default_values:)
  result = data.map do |datum|
    conditions = datum.select { |k| target_keys.include?(k) }

    row = raw_model.lock("FOR NO KEY UPDATE").where(conditions)

    if row.to_a.size > 1
      raise TableSync::UpsertError.new(data: datum, target_keys: target_keys, result: row)
    end

    row = row.first

    if row
      next if datum[version_key] <= row[version_key]

      row.update!(datum)
    else
      create_data = default_values.merge(datum)
      row = raw_model.create!(create_data)
    end

    row_to_hash(row)
  end.compact

  TableSync::Instrument.notify(table: model_naming.table, schema: model_naming.schema,
                               event: :update, count: result.count, direction: :receive)

  result
end