Class: TableSync::Receiving::Model::Sequel

Inherits:
Object
  • Object
show all
Defined in:
lib/table_sync/receiving/model/sequel.rb

Constant Summary collapse

ISOLATION_LEVELS =
{
  uncommitted: :uncommitted,
  committed: :committed,
  repeatable: :repeatable,
  serializable: :serializable,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table_name) ⇒ Sequel

Returns a new instance of Sequel.



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/table_sync/receiving/model/sequel.rb', line 14

def initialize(table_name)
  @raw_model = Class.new(::Sequel::Model(table_name)).tap(&:unrestrict_primary_key)
  @types_validator = TableSync::Utils::Schema::Builder::Sequel.build(@raw_model)

  model_naming = ::TableSync::NamingResolver::Sequel.new(
    table_name:,
    db: @raw_model.db,
  )

  @table = model_naming.table.to_sym
  @schema = model_naming.schema.to_sym
end

Instance Attribute Details

#schemaObject (readonly)

Returns the value of attribute schema.



5
6
7
# File 'lib/table_sync/receiving/model/sequel.rb', line 5

def schema
  @schema
end

#tableObject (readonly)

Returns the value of attribute table.



5
6
7
# File 'lib/table_sync/receiving/model/sequel.rb', line 5

def table
  @table
end

Instance Method Details

#after_commitObject



75
76
77
# File 'lib/table_sync/receiving/model/sequel.rb', line 75

def after_commit(&)
  db.after_commit(&)
end

#columnsObject



31
32
33
# File 'lib/table_sync/receiving/model/sequel.rb', line 31

def columns
  dataset.columns
end

#destroy(data:, target_keys:, version_key:) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/table_sync/receiving/model/sequel.rb', line 55

def destroy(data:, target_keys:, version_key:)
  sanitized_data = data.map { |attr| attr.slice(*target_keys) }
  sanitized_data = type_cast(sanitized_data)
  result = dataset.returning.where(::Sequel.|(*sanitized_data)).delete

  if result.size > data.size
    raise TableSync::DestroyError.new(data:, target_keys:, result:)
  end

  result
end

#find_and_save(keys:) {|entry| ... } ⇒ Object

Yields:

  • (entry)


87
88
89
90
91
92
93
# File 'lib/table_sync/receiving/model/sequel.rb', line 87

def find_and_save(keys:)
  entry = dataset.first(keys)
  return unless entry

  yield entry
  entry.save_changes
end

#isolation_level(lookup_code) ⇒ Object



27
28
29
# File 'lib/table_sync/receiving/model/sequel.rb', line 27

def isolation_level(lookup_code)
  ISOLATION_LEVELS.fetch(lookup_code)
end

#primary_keysObject



35
36
37
# File 'lib/table_sync/receiving/model/sequel.rb', line 35

def primary_keys
  [raw_model.primary_key].flatten
end

#transaction(**params) ⇒ Object



71
72
73
# File 'lib/table_sync/receiving/model/sequel.rb', line 71

def transaction(**params, &)
  db.transaction(**params, &)
end

#try_advisory_lock(lock_key) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/table_sync/receiving/model/sequel.rb', line 79

def try_advisory_lock(lock_key)
  transaction do
    if db.get(::Sequel.function(:pg_try_advisory_xact_lock, lock_key.to_i))
      yield
    end
  end
end

#upsert(data:, target_keys:, version_key:, default_values:) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/table_sync/receiving/model/sequel.rb', line 39

def upsert(data:, target_keys:, version_key:, default_values:)
  qualified_version = ::Sequel.qualify(raw_model.table_name, version_key)
  version_condition = ::Sequel.function(:coalesce, qualified_version, 0) <
                      ::Sequel.qualify(:excluded, version_key)

  upd_spec = update_spec(data.first.keys - target_keys)
  data.map! { |d| default_values.merge(d) }

  insert_data = type_cast(data)

  dataset
    .returning
    .insert_conflict(target: target_keys, update: upd_spec, update_where: version_condition)
    .multi_insert(insert_data)
end

#validate_types(data) ⇒ Object



67
68
69
# File 'lib/table_sync/receiving/model/sequel.rb', line 67

def validate_types(data)
  types_validator.validate(data)
end