Class: ChronoModel::Adapter

Inherits:
ActiveRecord::ConnectionAdapters::PostgreSQLAdapter
  • Object
show all
Defined in:
lib/chrono_model/adapter.rb

Overview

This class implements all ActiveRecord::ConnectionAdapters::SchemaStatements methods adding support for temporal extensions. It inherits from the Postgres adapter for a clean override of its methods using super.

Defined Under Namespace

Classes: TSRange

Constant Summary collapse

TEMPORAL_SCHEMA =

The schema holding current data

'temporal'
HISTORY_SCHEMA =

The schema holding historical data

'history'
RANGE_TYPE =

This is the data type used for the SCD2 validity

'tsrange'
TableCache =
(Class.new(Hash) do
  def all         ; keys;                      ; end
  def add!  table ; self[table.to_s] = true    ; end
  def del!  table ; self[table.to_s] = nil     ; end
  def fetch table ; self[table.to_s] ||= yield ; end
end).new

Instance Method Summary collapse

Instance Method Details

#add_column(table_name) ⇒ Object

If adding a column to a temporal table, creates it in the table in the temporal schema and updates the triggers.



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/chrono_model/adapter.rb', line 228

def add_column(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    # Add the column to the temporal table
    _on_temporal_schema { super }

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#add_index(table_name, column_name, options = {}) ⇒ Object

If adding an index to a temporal table, add it to the one in the temporal schema and to the history one. If the :unique option is present, it is removed from the index created in the history table.



200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/chrono_model/adapter.rb', line 200

def add_index(table_name, column_name, options = {})
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }

    # Uniqueness constraints do not make sense in the history table
    options = options.dup.tap {|o| o.delete(:unique)} if options[:unique].present?

    _on_history_schema { super table_name, column_name, options }
  end
end

#add_temporal_indexes(table, range, options = {}) ⇒ Object

Create spatial indexes for timestamp search.

This index is used by TimeMachine.at, .current and .past to build the temporal WHERE clauses that fetch the state of records at a single point in time.

Parameters:

`table`: the table where to create indexes on
`range`: the tsrange field

Options:

`:name`: the index name prefix, defaults to
         index_{table}_temporal_on_{range / lower_range / upper_range}


320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/chrono_model/adapter.rb', line 320

def add_temporal_indexes(table, range, options = {})
  range_idx, lower_idx, upper_idx =
    temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    execute "      CREATE INDEX \#{range_idx} ON \#{table} USING gist ( \#{range} )\n    SQL\n\n    # Indexes used for precise history filtering, sorting and, in history\n    # tables, by UPDATE / DELETE triggers.\n    #\n    execute \"CREATE INDEX \#{lower_idx} ON \#{table} ( lower(\#{range}) )\"\n    execute \"CREATE INDEX \#{upper_idx} ON \#{table} ( upper(\#{range}) )\"\n  end\nend\n"

#add_timeline_consistency_constraint(table, range, options = {}) ⇒ Object

Adds an EXCLUDE constraint to the given table, to assure that no more than one record can occupy a definite segment on a timeline.



362
363
364
365
366
367
368
369
370
371
372
# File 'lib/chrono_model/adapter.rb', line 362

def add_timeline_consistency_constraint(table, range, options = {})
  name = timeline_consistency_constraint_name(table)
  id   = options[:id] || primary_key(table)

  chrono_alter_constraint(table, options) do
    execute "      ALTER TABLE \#{table} ADD CONSTRAINT \#{name}\n        EXCLUDE USING gist ( \#{id} WITH =, \#{range} WITH && )\n    SQL\n  end\nend\n"

#change_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then change the column from the table in the temporal schema and eventually recreate the triggers.



260
261
262
263
# File 'lib/chrono_model/adapter.rb', line 260

def change_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#change_column_default(table_name) ⇒ Object

Change the default on the temporal schema table.



267
268
269
270
# File 'lib/chrono_model/adapter.rb', line 267

def change_column_default(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_column_null(table_name) ⇒ Object

Change the null constraint on the temporal schema table.



274
275
276
277
# File 'lib/chrono_model/adapter.rb', line 274

def change_column_null(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_table(table_name, options = {}, &block) ⇒ Object

If changing a temporal table, redirect the change to the table in the temporal schema and recreate views.

If the :temporal option is specified, enables or disables temporal features on the given table. Please note that you’ll lose your history when demoting a temporal table to a plain one.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/chrono_model/adapter.rb', line 113

def change_table(table_name, options = {}, &block)
  transaction do

    # Add an empty proc to support calling change_table without a block.
    #
    block ||= proc { }

    if options[:temporal] == true
      if !is_chrono?(table_name)
        # Add temporal features to this table
        #
        if !primary_key(table_name)
          execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY"
        end

        execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}"
        _on_history_schema { chrono_create_history_for(table_name) }
        chrono_create_view_for(table_name, options)
        copy_indexes_to_history_for(table_name)

        TableCache.add! table_name

        # Optionally copy the plain table data, setting up history
        # retroactively.
        #
        if options[:copy_data]
          seq  = _on_history_schema { serial_sequence(table_name, primary_key(table_name)) }
          from = options[:validity] || '0001-01-01 00:00:00'

          execute %[
            INSERT INTO #{HISTORY_SCHEMA}.#{table_name}
            SELECT *,
              nextval('#{seq}')        AS hid,
              tsrange('#{from}', NULL) AS validity,
              timezone('UTC', now())   AS recorded_at
            FROM #{TEMPORAL_SCHEMA}.#{table_name}
          ]
        end
      end

      chrono_alter(table_name) { super table_name, options, &block }
    else
      if options[:temporal] == false && is_chrono?(table_name)
        # Remove temporal features from this table
        #
        execute "DROP VIEW #{table_name}"

        _on_temporal_schema do
          %w( insert update delete ).each do |func|
            execute "DROP FUNCTION IF EXISTS #{table_name}_#{func}() CASCADE"
          end
        end

        _on_history_schema { execute "DROP TABLE #{table_name}" }

        default_schema = select_value 'SELECT current_schema()'
        _on_temporal_schema do
          if primary_key(table_name) == '__chrono_id'
            execute "ALTER TABLE #{table_name} DROP __chrono_id"
          end

          execute "ALTER TABLE #{table_name} SET SCHEMA #{default_schema}"
        end

        TableCache.del! table_name
      end

      super table_name, options, &block
    end
  end
end

#chrono_setup!Object



492
493
494
495
496
497
# File 'lib/chrono_model/adapter.rb', line 492

def chrono_setup!
  chrono_create_schemas
  chrono_setup_type_map

  chrono_upgrade_structure!
end

#chrono_supported?Boolean

Returns true whether the connection adapter supports our implementation of temporal tables. Currently, Chronomodel is supported starting with PostgreSQL 9.3.

Returns:

  • (Boolean)


24
25
26
# File 'lib/chrono_model/adapter.rb', line 24

def chrono_supported?
  postgresql_version >= 90300
end

#copy_indexes_to_history_for(table_name) ⇒ Object

Copy the indexes from the temporal table to the history table if the indexes are not already created with the same name.

Uniqueness is voluntarily ignored, as it doesn’t make sense on history tables.

Ref: GitHub pull #21.



507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'lib/chrono_model/adapter.rb', line 507

def copy_indexes_to_history_for(table_name)
  history_indexes  = _on_history_schema  { indexes(table_name) }.map(&:name)
  temporal_indexes = _on_temporal_schema { indexes(table_name) }

  temporal_indexes.each do |index|
    next if history_indexes.include?(index.name)

    _on_history_schema do
      execute %[
        CREATE INDEX #{index.name} ON #{table_name}
        USING #{index.using} ( #{index.columns.join(', ')} )
      ], 'Copy index from temporal to history'
    end
  end
end

#create_table(table_name, options = {}) ⇒ Object

Creates the given table, possibly creating the temporal schema objects if the :temporal option is given and set to true.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/chrono_model/adapter.rb', line 31

def create_table(table_name, options = {})
  # No temporal features requested, skip
  return super unless options[:temporal]

  if options[:id] == false
    logger.warn "WARNING - Temporal Temporal tables require a primary key."
    logger.warn "WARNING - Creating a \"__chrono_id\" primary key to fulfill the requirement"

    options[:id] = '__chrono_id'
  end

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { chrono_create_history_for(table_name) }

    chrono_create_view_for(table_name, options)

    TableCache.add! table_name
  end
end

#drop_table(table_name) ⇒ Object

If dropping a temporal table, drops it from the temporal schema adding the CASCADE option so to delete the history, view and triggers.



188
189
190
191
192
193
194
# File 'lib/chrono_model/adapter.rb', line 188

def drop_table(table_name, *)
  return super unless is_chrono?(table_name)

  _on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" }

  TableCache.del! table_name
end

#is_chrono?(table) ⇒ Boolean

Returns true if the given name references a temporal table.

Returns:

  • (Boolean)


433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/chrono_model/adapter.rb', line 433

def is_chrono?(table)
  TableCache.fetch(table) do
    _on_temporal_schema { table_exists?(table) } &&
    _on_history_schema { table_exists?(table) }
  end

rescue ActiveRecord::StatementInvalid => e
  # means that we could not change the search path to check for
  # table existence
  if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue)
    return false
  else
    raise e
  end
end

#is_exception_class?(e, *klasses) ⇒ Boolean

Returns:

  • (Boolean)


449
450
451
452
453
454
455
# File 'lib/chrono_model/adapter.rb', line 449

def is_exception_class?(e, *klasses)
  if e.respond_to?(:original_exception)
    klasses.any? { |k| e.is_a?(k) }
  else
    klasses.any? { |k| e.message =~ /#{k.name}/ }
  end
end

#on_schema(schema, nesting = true, &block) ⇒ Object

Evaluates the given block in the given schema search path.

By default, nested call are allowed, to disable this feature pass false as the second parameter.



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/chrono_model/adapter.rb', line 394

def on_schema(schema, nesting = true, &block)
  @_on_schema_nesting = (@_on_schema_nesting || 0) + 1

  if nesting || @_on_schema_nesting == 1
    old_path = self.schema_search_path
    self.schema_search_path = schema
  end

  block.call

ensure
  if (nesting || @_on_schema_nesting == 1)

    # If the transaction is aborted, any execute() call will raise
    # "transaction is aborted errors" - thus calling the Adapter's
    # setter won't update the memoized variable.
    #
    # Here we reset it to +nil+ to refresh it on the next call, as
    # there is no way to know which path will be restored when the
    # transaction ends.
    #
    if @connection.transaction_status == PGconn::PQTRANS_INERROR
      @schema_search_path = nil
    else
      self.schema_search_path = old_path
    end
  end
  @_on_schema_nesting -= 1
end

#remove_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then drop the column from the table in the temporal schema and eventually recreate the triggers.



283
284
285
286
# File 'lib/chrono_model/adapter.rb', line 283

def remove_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#remove_index(table_name) ⇒ Object

If removing an index from a temporal table, remove it both from the temporal and the history schemas.



216
217
218
219
220
221
222
223
# File 'lib/chrono_model/adapter.rb', line 216

def remove_index(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { super }
  end
end

#remove_temporal_indexes(table, range, options = {}) ⇒ Object



337
338
339
340
341
342
343
# File 'lib/chrono_model/adapter.rb', line 337

def remove_temporal_indexes(table, range, options = {})
  indexes = temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    indexes.each {|idx| execute "DROP INDEX #{idx}" }
  end
end

#remove_timeline_consistency_constraint(table, options = {}) ⇒ Object



374
375
376
377
378
379
380
381
382
# File 'lib/chrono_model/adapter.rb', line 374

def remove_timeline_consistency_constraint(table, options = {})
  name = timeline_consistency_constraint_name(options[:prefix] || table)

  chrono_alter_constraint(table, options) do
    execute "      ALTER TABLE \#{table} DROP CONSTRAINT \#{name}\n    SQL\n  end\nend\n"

#rename_column(table_name) ⇒ Object

If renaming a column of a temporal table, rename it in the table in the temporal schema and update the triggers.



243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/chrono_model/adapter.rb', line 243

def rename_column(table_name, *)
  return super unless is_chrono?(table_name)

  # Rename the column in the temporal table and in the view
  transaction do
    _on_temporal_schema { super }
    super

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#rename_table(name, new_name) ⇒ Object

If renaming a temporal table, rename the history and view as well.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/chrono_model/adapter.rb', line 54

def rename_table(name, new_name)
  return super unless is_chrono?(name)

  clear_cache!

  transaction do
    # Rename tables
    #
    [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
      on_schema(schema) do
        seq     = serial_sequence(name, primary_key(name))
        new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last

        execute "ALTER SEQUENCE #{seq}  RENAME TO #{new_seq}"
        execute "ALTER TABLE    #{name} RENAME TO #{new_name}"
      end
    end

    # Rename indexes
    #
    pkey = primary_key(new_name)
    _on_history_schema do
      standard_index_names = %w(
        inherit_pkey instance_history pkey
        recorded_at timeline_consistency )

      old_names = temporal_index_names(name, :validity) +
        standard_index_names.map {|i| [name, i].join('_') }

      new_names = temporal_index_names(new_name, :validity) +
        standard_index_names.map {|i| [new_name, i].join('_') }

      old_names.zip(new_names).each do |old, new|
        execute "ALTER INDEX #{old} RENAME TO #{new}"
      end
    end

    # Rename functions
    #
    %w( insert update delete ).each do |func|
      execute "ALTER FUNCTION chronomodel_#{name}_#{func}() RENAME TO chronomodel_#{new_name}_#{func}"
    end

    # Rename the public view
    #
    execute "ALTER VIEW #{name} RENAME TO #{new_name}"

    TableCache.del! name
    TableCache.add! new_name
  end
end

#temporal_index_names(table, range, options = {}) ⇒ Object



345
346
347
348
349
350
351
352
353
354
355
# File 'lib/chrono_model/adapter.rb', line 345

def temporal_index_names(table, range, options = {})
  prefix = options[:name].presence || "index_#{table}_temporal"

  # When creating computed indexes (e.g. ends_on::timestamp + time
  # '23:59:59'), remove everything following the field name.
  range = range.to_s.sub(/\W.*/, '')

  [range, "lower_#{range}", "upper_#{range}"].map do |suffix|
    [prefix, 'on', suffix].join('_')
  end
end

#timeline_consistency_constraint_name(table) ⇒ Object



384
385
386
# File 'lib/chrono_model/adapter.rb', line 384

def timeline_consistency_constraint_name(table)
  "#{table}_timeline_consistency"
end