Class: TableSync::Receiving::Model::ActiveRecord
- Inherits:
-
Object
- Object
- TableSync::Receiving::Model::ActiveRecord
- Defined in:
- lib/table_sync/receiving/model/active_record.rb
Defined Under Namespace
Classes: AfterCommitWrap
Constant Summary collapse
- ISOLATION_LEVELS =
{ uncommitted: :read_uncommitted, committed: :read_committed, repeatable: :repeatable_read, serializable: :serializable, }.freeze
Instance Attribute Summary collapse
-
#schema ⇒ Object
readonly
Returns the value of attribute schema.
-
#table ⇒ Object
readonly
Returns the value of attribute table.
Instance Method Summary collapse
- #after_commit ⇒ Object
- #columns ⇒ Object
- #destroy(data:, target_keys:, version_key:) ⇒ Object
- #find_and_save(keys:) {|entry| ... } ⇒ Object
-
#initialize(table_name) ⇒ ActiveRecord
constructor
A new instance of ActiveRecord.
- #isolation_level(lookup_code) ⇒ Object
- #primary_keys ⇒ Object
- #transaction(**params) ⇒ Object
- #try_advisory_lock(lock_key) ⇒ Object
- #upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object
- #validate_types(data) ⇒ Object
Constructor Details
#initialize(table_name) ⇒ ActiveRecord
Returns a new instance of ActiveRecord.
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 30 def initialize(table_name) @raw_model = Class.new(::ActiveRecord::Base) do self.table_name = table_name self.inheritance_column = nil end @types_validator = TableSync::Utils::Schema::Builder::ActiveRecord.build(@raw_model) model_naming = ::TableSync::NamingResolver::ActiveRecord.new(table_name:) @table = model_naming.table.to_sym @schema = model_naming.schema.to_sym end |
Instance Attribute Details
#schema ⇒ Object (readonly)
Returns the value of attribute schema.
28 29 30 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 28 def schema @schema end |
#table ⇒ Object (readonly)
Returns the value of attribute table.
28 29 30 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 28 def table @table end |
Instance Method Details
#after_commit ⇒ Object
128 129 130 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 128 def after_commit(&) db.add_transaction_record(AfterCommitWrap.new(&)) end |
#columns ⇒ Object
47 48 49 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 47 def columns raw_model.column_names.map(&:to_sym) end |
#destroy(data:, target_keys:, version_key:) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 99 def destroy(data:, target_keys:, version_key:) sanitized_data = data.map { |attr| attr.slice(*target_keys) } query = nil sanitized_data.each_with_index do |row, index| if index == 0 query = raw_model.lock("FOR UPDATE").where(row) else query = query.or(raw_model.lock("FOR UPDATE").where(row)) end end result = query.destroy_all.map { |x| row_to_hash(x) } if result.size > data.size raise TableSync::DestroyError.new(data:, target_keys:, result:) end result end |
#find_and_save(keys:) {|entry| ... } ⇒ Object
140 141 142 143 144 145 146 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 140 def find_and_save(keys:) entry = raw_model.find_by(keys) return unless entry yield entry entry.save! end |
#isolation_level(lookup_code) ⇒ Object
43 44 45 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 43 def isolation_level(lookup_code) ISOLATION_LEVELS.fetch(lookup_code) end |
#primary_keys ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 51 def primary_keys db.execute(" SELECT kcu.column_name\n FROM INFORMATION_SCHEMA.TABLES t\n LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc\n ON tc.table_catalog = t.table_catalog\n AND tc.table_schema = t.table_schema\n AND tc.table_name = t.table_name\n AND tc.constraint_type = 'PRIMARY KEY'\n LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu\n ON kcu.table_catalog = tc.table_catalog\n AND kcu.table_schema = tc.table_schema\n AND kcu.table_name = tc.table_name\n AND kcu.constraint_name = tc.constraint_name\n WHERE\n t.table_schema NOT IN ('pg_catalog', 'information_schema')\n AND t.table_schema = '\#{schema}'\n AND t.table_name = '\#{table}'\n ORDER BY\n kcu.ordinal_position\n SQL\nend\n").column_values(0).map(&:to_sym) |
#transaction(**params) ⇒ Object
124 125 126 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 124 def transaction(**params, &) ::ActiveRecord::Base.transaction(**params, &) end |
#try_advisory_lock(lock_key) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 132 def try_advisory_lock(lock_key) transaction do if db.query_value("SELECT pg_try_advisory_xact_lock(#{lock_key.to_i})") yield end end end |
#upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 74 def upsert(data:, target_keys:, version_key:, default_values:) data.filter_map do |datum| conditions = datum.select { |k| target_keys.include?(k) } row = raw_model.lock("FOR NO KEY UPDATE").where(conditions) if row.to_a.size > 1 raise TableSync::UpsertError.new(data: datum, target_keys:, result: row) end row = row.first if row next if datum[version_key] <= row[version_key] row.update!(datum) else create_data = default_values.merge(datum) row = raw_model.create!(create_data) end row_to_hash(row) end end |
#validate_types(data) ⇒ Object
120 121 122 |
# File 'lib/table_sync/receiving/model/active_record.rb', line 120 def validate_types(data) types_validator.validate(data) end |