Class: ChronoModel::Adapter

Inherits:
ActiveRecord::ConnectionAdapters::PostgreSQLAdapter
  • Object
show all
Defined in:
lib/chrono_model/adapter.rb

Overview

This class implements all ActiveRecord::ConnectionAdapters::SchemaStatements methods adding support for temporal extensions. It inherits from the Postgres adapter for a clean override of its methods using super.

Defined Under Namespace

Classes: TSRange

Constant Summary collapse

TEMPORAL_SCHEMA =

The schema holding current data

'temporal'
HISTORY_SCHEMA =

The schema holding historical data

'history'
RANGE_TYPE =

This is the data type used for the SCD2 validity

'tsrange'

Instance Method Summary collapse

Instance Method Details

#add_column(table_name) ⇒ Object

If adding a column to a temporal table, creates it in the table in the temporal schema and updates the triggers.



235
236
237
238
239
240
241
242
243
244
245
# File 'lib/chrono_model/adapter.rb', line 235

def add_column(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    # Add the column to the temporal table
    _on_temporal_schema { super }

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#add_index(table_name, column_name, options = {}) ⇒ Object

If adding an index to a temporal table, add it to the one in the temporal schema and to the history one. If the :unique option is present, it is removed from the index created in the history table.



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/chrono_model/adapter.rb', line 207

def add_index(table_name, column_name, options = {})
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }

    # Uniqueness constraints do not make sense in the history table
    options = options.dup.tap {|o| o.delete(:unique)} if options[:unique].present?

    _on_history_schema { super table_name, column_name, options }
  end
end

#add_temporal_indexes(table, range, options = {}) ⇒ Object

Create spatial indexes for timestamp search.

This index is used by TimeMachine.at, .current and .past to build the temporal WHERE clauses that fetch the state of records at a single point in time.

Parameters:

`table`: the table where to create indexes on
`range`: the tsrange field

Options:

`:name`: the index name prefix, defaults to
         index_{table}_temporal_on_{range / lower_range / upper_range}


340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/chrono_model/adapter.rb', line 340

def add_temporal_indexes(table, range, options = {})
  range_idx, lower_idx, upper_idx =
    temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    execute "      CREATE INDEX \#{range_idx} ON \#{table} USING gist ( \#{range} )\n    SQL\n\n    # Indexes used for precise history filtering, sorting and, in history\n    # tables, by UPDATE / DELETE triggers.\n    #\n    execute \"CREATE INDEX \#{lower_idx} ON \#{table} ( lower(\#{range}) )\"\n    execute \"CREATE INDEX \#{upper_idx} ON \#{table} ( upper(\#{range}) )\"\n  end\nend\n"

#add_timeline_consistency_constraint(table, range, options = {}) ⇒ Object

Adds an EXCLUDE constraint to the given table, to assure that no more than one record can occupy a definite segment on a timeline.



382
383
384
385
386
387
388
389
390
391
392
# File 'lib/chrono_model/adapter.rb', line 382

def add_timeline_consistency_constraint(table, range, options = {})
  name = timeline_consistency_constraint_name(table)
  id   = options[:id] || primary_key(table)

  chrono_alter_constraint(table, options) do
    execute "      ALTER TABLE \#{table} ADD CONSTRAINT \#{name}\n        EXCLUDE USING gist ( \#{id} WITH =, \#{range} WITH && )\n    SQL\n  end\nend\n"

#change_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then change the column from the table in the temporal schema and eventually recreate the triggers.



267
268
269
270
# File 'lib/chrono_model/adapter.rb', line 267

def change_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#change_column_default(table_name) ⇒ Object

Change the default on the temporal schema table.



274
275
276
277
# File 'lib/chrono_model/adapter.rb', line 274

def change_column_default(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_column_null(table_name) ⇒ Object

Change the null constraint on the temporal schema table.



281
282
283
284
# File 'lib/chrono_model/adapter.rb', line 281

def change_column_null(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_table(table_name, options = {}, &block) ⇒ Object

If changing a temporal table, redirect the change to the table in the temporal schema and recreate views.

If the :temporal option is specified, enables or disables temporal features on the given table. Please note that you’ll lose your history when demoting a temporal table to a plain one.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/chrono_model/adapter.rb', line 126

def change_table(table_name, options = {}, &block)
  transaction do

    # Add an empty proc to support calling change_table without a block.
    #
    block ||= proc { }

    if options[:temporal] == true
      if !is_chrono?(table_name)
        # Add temporal features to this table
        #
        if !primary_key(table_name)
          execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY"
        end

        execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}"
        _on_history_schema { chrono_create_history_for(table_name) }
        chrono_create_view_for(table_name, options)
        copy_indexes_to_history_for(table_name)

        # Optionally copy the plain table data, setting up history
        # retroactively.
        #
        if options[:copy_data]
          seq  = _on_history_schema { serial_sequence(table_name, primary_key(table_name)) }
          from = options[:validity] || '0001-01-01 00:00:00'

          execute %[
            INSERT INTO #{HISTORY_SCHEMA}.#{table_name}
            SELECT *,
              nextval('#{seq}')        AS hid,
              tsrange('#{from}', NULL) AS validity,
              timezone('UTC', now())   AS recorded_at
            FROM #{TEMPORAL_SCHEMA}.#{table_name}
          ]
        end
      end

      chrono_alter(table_name, options) { super table_name, options, &block }
    else
      if options[:temporal] == false && is_chrono?(table_name)
        # Remove temporal features from this table
        #
        execute "DROP VIEW #{table_name}"

        _on_temporal_schema do
          %w( insert update delete ).each do |func|
            execute "DROP FUNCTION IF EXISTS #{table_name}_#{func}() CASCADE"
          end
        end

        _on_history_schema { execute "DROP TABLE #{table_name}" }

        default_schema = select_value 'SELECT current_schema()'
        _on_temporal_schema do
          if primary_key(table_name) == '__chrono_id'
            execute "ALTER TABLE #{table_name} DROP __chrono_id"
          end

          execute "ALTER TABLE #{table_name} SET SCHEMA #{default_schema}"
        end
      end

      super table_name, options, &block
    end
  end
end

#chrono_setup!Object



468
469
470
471
472
# File 'lib/chrono_model/adapter.rb', line 468

def chrono_setup!
  chrono_create_schemas

  chrono_upgrade_structure!
end

#chrono_supported?Boolean

Returns true whether the connection adapter supports our implementation of temporal tables. Currently, Chronomodel is supported starting with PostgreSQL 9.3.

Returns:

  • (Boolean)


26
27
28
# File 'lib/chrono_model/adapter.rb', line 26

def chrono_supported?
  postgresql_version >= 90300
end

#copy_indexes_to_history_for(table_name) ⇒ Object

Copy the indexes from the temporal table to the history table if the indexes are not already created with the same name.

Uniqueness is voluntarily ignored, as it doesn’t make sense on history tables.

Ref: GitHub pull #21.



529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# File 'lib/chrono_model/adapter.rb', line 529

def copy_indexes_to_history_for(table_name)
  history_indexes  = _on_history_schema  { indexes(table_name) }.map(&:name)
  temporal_indexes = _on_temporal_schema { indexes(table_name) }

  temporal_indexes.each do |index|
    next if history_indexes.include?(index.name)

    _on_history_schema do
      execute %[
        CREATE INDEX #{index.name} ON #{table_name}
        USING #{index.using} ( #{index.columns.join(', ')} )
      ], 'Copy index from temporal to history'
    end
  end
end

#create_table(table_name, options = {}) ⇒ Object

Creates the given table, possibly creating the temporal schema objects if the :temporal option is given and set to true.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/chrono_model/adapter.rb', line 33

def create_table(table_name, options = {})
  # No temporal features requested, skip
  return super unless options[:temporal]

  if options[:id] == false
    logger.warn "WARNING - Temporal Temporal tables require a primary key."
    logger.warn "WARNING - Creating a \"__chrono_id\" primary key to fulfill the requirement"

    options[:id] = '__chrono_id'
  end

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { chrono_create_history_for(table_name) }

    chrono_create_view_for(table_name, options)
  end
end

#drop_table(table_name) ⇒ Object

If dropping a temporal table, drops it from the temporal schema adding the CASCADE option so to delete the history, view and triggers.



197
198
199
200
201
# File 'lib/chrono_model/adapter.rb', line 197

def drop_table(table_name, *)
  return super unless is_chrono?(table_name)

  _on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" }
end

#initialize_type_map(type_map) ⇒ Object



512
513
514
515
516
517
518
519
# File 'lib/chrono_model/adapter.rb', line 512

def initialize_type_map(type_map)
  super.tap do
    ar_type = type_map.fetch(TSRange::OID)
    cm_type = TSRange.new(ar_type.subtype, ar_type.type)

    type_map.register_type TSRange::OID, cm_type
  end
end

#is_chrono?(table) ⇒ Boolean

Returns true if the given name references a temporal table.

Returns:

  • (Boolean)


446
447
448
449
450
451
452
453
454
455
456
457
458
# File 'lib/chrono_model/adapter.rb', line 446

def is_chrono?(table)
  _on_temporal_schema { chrono_data_source_exists?(table) } &&
    _on_history_schema { chrono_data_source_exists?(table) }

rescue ActiveRecord::StatementInvalid => e
  # means that we could not change the search path to check for
  # table existence
  if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue)
    return false
  else
    raise e
  end
end

#is_exception_class?(e, *klasses) ⇒ Boolean

Returns:

  • (Boolean)


460
461
462
463
464
465
466
# File 'lib/chrono_model/adapter.rb', line 460

def is_exception_class?(e, *klasses)
  if e.respond_to?(:original_exception)
    klasses.any? { |k| e.is_a?(k) }
  else
    klasses.any? { |k| e.message =~ /#{k.name}/ }
  end
end

#on_schema(schema, nesting = true, &block) ⇒ Object

Evaluates the given block in the given schema search path.

By default, nested call are allowed, to disable this feature pass false as the second parameter.



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/chrono_model/adapter.rb', line 414

def on_schema(schema, nesting = true, &block)
  @_on_schema_nesting = (@_on_schema_nesting || 0) + 1

  if nesting || @_on_schema_nesting == 1
    old_path = self.schema_search_path
    self.schema_search_path = schema
  end

  block.call

ensure
  if (nesting || @_on_schema_nesting == 1)

    # If the transaction is aborted, any execute() call will raise
    # "transaction is aborted errors" - thus calling the Adapter's
    # setter won't update the memoized variable.
    #
    # Here we reset it to +nil+ to refresh it on the next call, as
    # there is no way to know which path will be restored when the
    # transaction ends.
    #
    if @connection.transaction_status == PG::Connection::PQTRANS_INERROR
      @schema_search_path = nil
    else
      self.schema_search_path = old_path
    end
  end
  @_on_schema_nesting -= 1
end

#remove_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then drop the column from the table in the temporal schema and eventually recreate the triggers.



290
291
292
293
# File 'lib/chrono_model/adapter.rb', line 290

def remove_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#remove_index(table_name) ⇒ Object

If removing an index from a temporal table, remove it both from the temporal and the history schemas.



223
224
225
226
227
228
229
230
# File 'lib/chrono_model/adapter.rb', line 223

def remove_index(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { super }
  end
end

#remove_temporal_indexes(table, range, options = {}) ⇒ Object



357
358
359
360
361
362
363
# File 'lib/chrono_model/adapter.rb', line 357

def remove_temporal_indexes(table, range, options = {})
  indexes = temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    indexes.each {|idx| execute "DROP INDEX #{idx}" }
  end
end

#remove_timeline_consistency_constraint(table, options = {}) ⇒ Object



394
395
396
397
398
399
400
401
402
# File 'lib/chrono_model/adapter.rb', line 394

def remove_timeline_consistency_constraint(table, options = {})
  name = timeline_consistency_constraint_name(options[:prefix] || table)

  chrono_alter_constraint(table, options) do
    execute "      ALTER TABLE \#{table} DROP CONSTRAINT \#{name}\n    SQL\n  end\nend\n"

#rename_column(table_name) ⇒ Object

If renaming a column of a temporal table, rename it in the table in the temporal schema and update the triggers.



250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/chrono_model/adapter.rb', line 250

def rename_column(table_name, *)
  return super unless is_chrono?(table_name)

  # Rename the column in the temporal table and in the view
  transaction do
    _on_temporal_schema { super }
    super

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#rename_table(name, new_name) ⇒ Object

If renaming a temporal table, rename the history and view as well.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/chrono_model/adapter.rb', line 54

def rename_table(name, new_name)
  return super unless is_chrono?(name)

  clear_cache!

  transaction do
    # Rename tables
    #
    [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
      on_schema(schema) do
        seq     = serial_sequence(name, primary_key(name))
        new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last

        execute "ALTER SEQUENCE #{seq}  RENAME TO #{new_seq}"
        execute "ALTER TABLE    #{name} RENAME TO #{new_name}"
      end
    end


    # Rename indexes on history schema
    #
    _on_history_schema do
      standard_index_names = %w(
        inherit_pkey instance_history pkey
        recorded_at timeline_consistency )

      old_names = temporal_index_names(name, :validity) +
        standard_index_names.map {|i| [name, i].join('_') }

      new_names = temporal_index_names(new_name, :validity) +
        standard_index_names.map {|i| [new_name, i].join('_') }

      old_names.zip(new_names).each do |old, new|
        execute "ALTER INDEX #{old} RENAME TO #{new}"
      end
    end

    # Rename indexes on temporal schema
    #
    _on_temporal_schema do
      temporal_indexes =  indexes(new_name)
      temporal_indexes.map(&:name).each do |old_idx_name|
        if old_idx_name =~ /^index_#{name}_on_(?<columns>.+)/
          new_idx_name = "index_#{new_name}_on_#{$~['columns']}"
          execute "ALTER INDEX #{old_idx_name} RENAME TO #{new_idx_name}"
        end
      end
    end

    # Drop view
    #
    execute "DROP VIEW #{name}"

    # Drop functions
    #
    %w( insert update delete ).each do |func|
      execute "DROP FUNCTION chronomodel_#{name}_#{func}()"
    end

    # Create view and functions
    #
    chrono_create_view_for(new_name)
  end
end

#temporal_index_names(table, range, options = {}) ⇒ Object



365
366
367
368
369
370
371
372
373
374
375
# File 'lib/chrono_model/adapter.rb', line 365

def temporal_index_names(table, range, options = {})
  prefix = options[:name].presence || "index_#{table}_temporal"

  # When creating computed indexes (e.g. ends_on::timestamp + time
  # '23:59:59'), remove everything following the field name.
  range = range.to_s.sub(/\W.*/, '')

  [range, "lower_#{range}", "upper_#{range}"].map do |suffix|
    [prefix, 'on', suffix].join('_')
  end
end

#timeline_consistency_constraint_name(table) ⇒ Object



404
405
406
# File 'lib/chrono_model/adapter.rb', line 404

def timeline_consistency_constraint_name(table)
  "#{table}_timeline_consistency"
end