Class: TableSync::Model::ActiveRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/table_sync/model/active_record.rb

Defined Under Namespace

Classes: AfterCommitWrap

Instance Method Summary collapse

Constructor Details

#initialize(table_name) ⇒ ActiveRecord

Returns a new instance of ActiveRecord.



21
22
23
24
25
26
# File 'lib/table_sync/model/active_record.rb', line 21

def initialize(table_name)
  @raw_model = Class.new(::ActiveRecord::Base) do
    self.table_name = table_name
    self.inheritance_column = nil
  end
end

Instance Method Details

#after_commit(&block) ⇒ Object



101
102
103
# File 'lib/table_sync/model/active_record.rb', line 101

def after_commit(&block)
  db.add_transaction_record(AfterCommitWrap.new(&block))
end

#columnsObject



28
29
30
# File 'lib/table_sync/model/active_record.rb', line 28

def columns
  raw_model.column_names.map(&:to_sym)
end

#destroy(data) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/table_sync/model/active_record.rb', line 83

def destroy(data)
  result = transaction do
    row = raw_model.lock("FOR UPDATE").find_by(data)&.destroy!
    [row_to_hash(row)]
  end

  TableSync::Instrument.notify(
    table: model_naming.table, schema: model_naming.schema,
    event: :destroy, count: result.count, direction: :receive
  )

  result
end

#primary_keysObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/table_sync/model/active_record.rb', line 32

def primary_keys
  db.execute(<<~SQL).column_values(0).map(&:to_sym)
    SELECT kcu.column_name
    FROM INFORMATION_SCHEMA.TABLES t
    LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
      ON tc.table_catalog = t.table_catalog
      AND tc.table_schema = t.table_schema
      AND tc.table_name = t.table_name
      AND tc.constraint_type = 'PRIMARY KEY'
    LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
      ON kcu.table_catalog = tc.table_catalog
      AND kcu.table_schema = tc.table_schema
      AND kcu.table_name = tc.table_name
      AND kcu.constraint_name = tc.constraint_name
    WHERE
      t.table_schema NOT IN ('pg_catalog', 'information_schema')
      AND t.table_schema = '#{model_naming.schema}'
      AND t.table_name = '#{model_naming.table}'
    ORDER BY
      kcu.ordinal_position
  SQL
end

#transaction(&block) ⇒ Object



97
98
99
# File 'lib/table_sync/model/active_record.rb', line 97

def transaction(&block)
  ::ActiveRecord::Base.transaction(&block)
end

#upsert(data:, target_keys:, version_key:, first_sync_time_key:, default_values:) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/table_sync/model/active_record.rb', line 55

def upsert(data:, target_keys:, version_key:, first_sync_time_key:, default_values:)
  data = Array.wrap(data)

  result = transaction do
    data.map do |datum|
      conditions = datum.select { |k| target_keys.include?(k) }

      row = raw_model.lock("FOR NO KEY UPDATE").find_by(conditions)
      if row
        next if datum[version_key] <= row[version_key]

        row.update!(datum)
      else
        create_data = default_values.merge(datum)
        create_data[first_sync_time_key] = Time.current if first_sync_time_key
        row = raw_model.create!(create_data)
      end

      row_to_hash(row)
    end.compact
  end

  TableSync::Instrument.notify(table: model_naming.table, schema: model_naming.schema,
                               event: :update, count: result.count, direction: :receive)

  result
end