Class: Sequent::Migrations::ViewSchema

Inherits:
Object
  • Object
show all
Includes:
Sql, Util::Printer, Util::Timer
Defined in:
lib/sequent/migrations/view_schema.rb

Overview

ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.

The following migrations are supported:

  • ReplayTable (Projector migrations)

  • AlterTable (For instance if you introduce a new column)

To maintain your migrations you need to:

  1. Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`

  2. Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`

  3. Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)

  4. Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`

  5. If you want to run an ‘alter_table` migration ensure that

a sql file named `table_name_VERSION.sql` exists.

Example:

class AppMigrations < Sequent::Migrations::Projectors

def self.version
  '3'
end

def self.versions
  {
    '1' => [Sequent.all_projectors],
    '2' => [
      UserProjector,
      InvoiceProjector,
    ],
    '3' => [
      Sequent::Migrations.alter_table(UserRecord)
    ]

  }
end

end

Defined Under Namespace

Classes: ReplayedIds, Versions

Constant Summary collapse

LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE =

Corresponds with the index on aggregate_id column in the event_records table

Since we replay in batches of the first 3 chars of the uuid we created an index on these 3 characters. Hence the name ;-)

This also means that the online replay is divided up into 16**3 groups This might seem a lot for starting event store, but when you will get more events, you will see that this is pretty good partitioned.

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Sql

#exec_sql, #sql_file_to_statements

Methods included from Util::Printer

#recursively_print

Methods included from Util::Timer

#time

Constructor Details

#initialize(db_config:) ⇒ ViewSchema

Returns a new instance of ViewSchema.



82
83
84
85
86
# File 'lib/sequent/migrations/view_schema.rb', line 82

def initialize(db_config:)
  @db_config = db_config
  @view_schema = Sequent.configuration.view_schema_name
  @logger = Sequent.logger
end

Instance Attribute Details

#db_configObject (readonly)

Returns the value of attribute db_config.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def db_config
  @db_config
end

#loggerObject (readonly)

Returns the value of attribute logger.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def logger
  @logger
end

#view_schemaObject (readonly)

Returns the value of attribute view_schema.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def view_schema
  @view_schema
end

Instance Method Details

#create_view_schema_if_not_existsObject

Utility method that creates the view_schema and the meta data tables

This method is mainly useful during an initial setup of the view schema



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/sequent/migrations/view_schema.rb', line 130

def create_view_schema_if_not_exists
  exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema}))
  in_view_schema do
    exec_sql(<<~SQL.chomp)
      CREATE TABLE IF NOT EXISTS #{Versions.table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version))
    SQL
    exec_sql(<<~SQL.chomp)
      CREATE TABLE IF NOT EXISTS #{ReplayedIds.table_name} (event_id bigint NOT NULL, CONSTRAINT event_id_pk PRIMARY KEY(event_id))
    SQL
  end
end

#create_view_tablesObject

Utility method that creates all tables in the view schema

This method is mainly useful in test scenario to just create the entire view schema without replaying the events



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/sequent/migrations/view_schema.rb', line 99

def create_view_tables
  create_view_schema_if_not_exists
  in_view_schema do
    Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table|
      sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql"
      statements = sql_file_to_statements(sql_file) do |raw_sql|
        raw_sql.remove('%SUFFIX%')
      end
      statements.each { |statement| exec_sql(statement) }

      indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql"
      if File.exist?(indexes_file_name)
        statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') }
        statements.each(&method(:exec_sql))
      end
    end
  end
end

#current_versionObject

Returns the current version from the database



90
91
92
# File 'lib/sequent/migrations/view_schema.rb', line 90

def current_version
  Versions.order('version desc').limit(1).first&.version || 0
end

#executorObject



146
147
148
# File 'lib/sequent/migrations/view_schema.rb', line 146

def executor
  @executor ||= Executor.new
end

#migrate_offlineObject

Last part of a view schema migration

You have to ensure no events are being added to the event store while this method is running. For instance put your application in maintenance mode.

The offline part consists of:

  1. Replay all events not yet replayed since #migration_online

  2. Within a single transaction do:

2.1 Rename current tables with the current version as SUFFIX 2.2 Rename the new tables and remove the new version suffix 2.3 Add the new version in the Versions table

  1. Performs cleanup of replayed event ids

If anything fails an exception is raised and everything is rolled back

When this method succeeds you can safely start the application from Sequent’s point of view.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/sequent/migrations/view_schema.rb', line 207

def migrate_offline
  return if Sequent.new_version == current_version

  ensure_version_correct!

  executor.set_table_names_to_new_version(plan)

  # 1 replay events not yet replayed
  if plan.projectors.any?
    replay!(
      Sequent.configuration.offline_replay_persistor_class.new,
      exclude_ids: true,
      group_exponent: 1,
    )
  end

  in_view_schema do
    Sequent::ApplicationRecord.transaction do
      # 2.1, 2.2
      executor.execute_offline(plan, current_version)
      # 2.3 Create migration record
      Versions.create!(version: Sequent.new_version)
    end

    # 3. Truncate replayed ids
    truncate_replay_ids_table!
  end
  logger.info "Migrated to version #{Sequent.new_version}"
  # rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  rollback_migration
  raise e
end

#migrate_onlineObject

First part of a view schema migration

Call this method while your application is running. The online part consists of:

  1. Ensure any previous migrations are cleaned up

  2. Create new tables for the Projectors which need to be migrated to the new version

These tables will be called `table_name_VERSION`.
  1. Replay all events to populate the tables

It keeps track of all events that are already replayed.

If anything fails an exception is raised and everything is rolled back



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/sequent/migrations/view_schema.rb', line 164

def migrate_online
  return if Sequent.new_version == current_version

  ensure_version_correct!

  in_view_schema do
    truncate_replay_ids_table!

    drop_old_tables(Sequent.new_version)
    executor.execute_online(plan)
  end

  replay!(Sequent.configuration.online_replay_persistor_class.new) if plan.projectors.any?

  in_view_schema do
    executor.create_indexes_after_execute_online(plan)
  end
  # rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  rollback_migration
  raise e
end

#planObject



142
143
144
# File 'lib/sequent/migrations/view_schema.rb', line 142

def plan
  @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version)
end

#replay_all!Object

Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s

This method is mainly useful in test scenario’s or development tasks



122
123
124
# File 'lib/sequent/migrations/view_schema.rb', line 122

def replay_all!
  replay!(Sequent.configuration.online_replay_persistor_class.new)
end