Class: Sequent::Migrations::ViewSchema
- Inherits:
-
Object
- Object
- Sequent::Migrations::ViewSchema
- Includes:
- Sql, Util::Printer, Util::Timer
- Defined in:
- lib/sequent/migrations/view_schema.rb
Overview
ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.
The following migrations are supported:
-
ReplayTable (Projector migrations)
-
AlterTable (For instance if you introduce a new column)
To maintain your migrations you need to:
-
Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`
-
Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`
-
Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)
-
Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`
-
If you want to run an ‘alter_table` migration ensure that
a sql file named `table_name_VERSION.sql` exists.
Example:
class AppMigrations < Sequent::Migrations::Projectors
def self.version
'3'
end
def self.versions
{
'1' => [Sequent.all_projectors],
'2' => [
UserProjector,
InvoiceProjector,
],
'3' => [
Sequent::Migrations.alter_table(UserRecord)
]
}
end
end
Defined Under Namespace
Classes: ReplayedIds, Versions
Constant Summary collapse
- LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE =
Corresponds with the index on aggregate_id column in the event_records table
Since we replay in batches of the first 3 chars of the uuid we created an index on these 3 characters. Hence the name ;-)
This also means that the online replay is divided up into 16**3 groups This might seem a lot for starting event store, but when you will get more events, you will see that this is pretty good partitioned.
3
Instance Attribute Summary collapse
-
#db_config ⇒ Object
readonly
Returns the value of attribute db_config.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#view_schema ⇒ Object
readonly
Returns the value of attribute view_schema.
Instance Method Summary collapse
-
#create_view_schema_if_not_exists ⇒ Object
Utility method that creates the view_schema and the meta data tables.
-
#create_view_tables ⇒ Object
Utility method that creates all tables in the view schema.
-
#current_version ⇒ Object
Returns the current version from the database.
- #executor ⇒ Object
-
#initialize(db_config:) ⇒ ViewSchema
constructor
A new instance of ViewSchema.
-
#migrate_offline ⇒ Object
Last part of a view schema migration.
-
#migrate_online ⇒ Object
First part of a view schema migration.
- #plan ⇒ Object
-
#replay_all! ⇒ Object
Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s.
Methods included from Sql
#exec_sql, #sql_file_to_statements
Methods included from Util::Printer
Methods included from Util::Timer
Constructor Details
#initialize(db_config:) ⇒ ViewSchema
Returns a new instance of ViewSchema.
78 79 80 81 82 |
# File 'lib/sequent/migrations/view_schema.rb', line 78 def initialize(db_config:) @db_config = db_config @view_schema = Sequent.configuration.view_schema_name @logger = Sequent.logger end |
Instance Attribute Details
#db_config ⇒ Object (readonly)
Returns the value of attribute db_config.
76 77 78 |
# File 'lib/sequent/migrations/view_schema.rb', line 76 def db_config @db_config end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
76 77 78 |
# File 'lib/sequent/migrations/view_schema.rb', line 76 def logger @logger end |
#view_schema ⇒ Object (readonly)
Returns the value of attribute view_schema.
76 77 78 |
# File 'lib/sequent/migrations/view_schema.rb', line 76 def view_schema @view_schema end |
Instance Method Details
#create_view_schema_if_not_exists ⇒ Object
Utility method that creates the view_schema and the meta data tables
This method is mainly useful during an initial setup of the view schema
123 124 125 126 127 128 129 |
# File 'lib/sequent/migrations/view_schema.rb', line 123 def create_view_schema_if_not_exists exec_sql(%Q{CREATE SCHEMA IF NOT EXISTS #{view_schema}}) in_view_schema do exec_sql(%Q{CREATE TABLE IF NOT EXISTS #{Versions.table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version))}) exec_sql(%Q{CREATE TABLE IF NOT EXISTS #{ReplayedIds.table_name} (event_id bigint NOT NULL, CONSTRAINT event_id_pk PRIMARY KEY(event_id))}) end end |
#create_view_tables ⇒ Object
Utility method that creates all tables in the view schema
This method is mainly useful in test scenario to just create the entire view schema without replaying the events
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/sequent/migrations/view_schema.rb', line 95 def create_view_tables create_view_schema_if_not_exists in_view_schema do Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table| statements = sql_file_to_statements("#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql") { |raw_sql| raw_sql.remove('%SUFFIX%') } statements.each { |statement| exec_sql(statement) } indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql" if File.exist?(indexes_file_name) statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') } statements.each(&method(:exec_sql)) end end end end |
#current_version ⇒ Object
Returns the current version from the database
86 87 88 |
# File 'lib/sequent/migrations/view_schema.rb', line 86 def current_version Versions.order('version desc').limit(1).first&.version || 0 end |
#executor ⇒ Object
135 136 137 |
# File 'lib/sequent/migrations/view_schema.rb', line 135 def executor @executor ||= Executor.new end |
#migrate_offline ⇒ Object
Last part of a view schema migration
You have to ensure no events are being added to the event store while this method is running. For instance put your application in maintenance mode.
The offline part consists of:
-
Replay all events not yet replayed since #migration_online
-
Within a single transaction do:
2.1 Rename current tables with the current version as SUFFIX 2.2 Rename the new tables and remove the new version suffix 2.3 Add the new version in the Versions
table
-
Performs cleanup of replayed event ids
If anything fails an exception is raised and everything is rolled back
When this method succeeds you can safely start the application from Sequent’s point of view.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/sequent/migrations/view_schema.rb', line 196 def migrate_offline return if Sequent.new_version == current_version ensure_version_correct! executor.set_table_names_to_new_version(plan) # 1 replay events not yet replayed replay!(Sequent.configuration.offline_replay_persistor_class.new, exclude_ids: true, group_exponent: 1) if plan.projectors.any? in_view_schema do Sequent::ApplicationRecord.transaction do # 2.1, 2.2 executor.execute_offline(plan, current_version) # 2.3 Create migration record Versions.create!(version: Sequent.new_version) end # 3. Truncate replayed ids truncate_replay_ids_table! end logger.info "Migrated to version #{Sequent.new_version}" rescue Exception => e rollback_migration raise e end |
#migrate_online ⇒ Object
First part of a view schema migration
Call this method while your application is running. The online part consists of:
-
Ensure any previous migrations are cleaned up
-
Create new tables for the Projectors which need to be migrated to the new version
These tables will be called `table_name_VERSION`.
-
Replay all events to populate the tables
It keeps track of all events that are already replayed.
If anything fails an exception is raised and everything is rolled back
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/sequent/migrations/view_schema.rb', line 153 def migrate_online return if Sequent.new_version == current_version ensure_version_correct! in_view_schema do truncate_replay_ids_table! drop_old_tables(Sequent.new_version) executor.execute_online(plan) end if plan.projectors.any? replay!(Sequent.configuration.online_replay_persistor_class.new) end in_view_schema do executor.create_indexes_after_execute_online(plan) end rescue Exception => e rollback_migration raise e end |
#plan ⇒ Object
131 132 133 |
# File 'lib/sequent/migrations/view_schema.rb', line 131 def plan @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version) end |
#replay_all! ⇒ Object
Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s
This method is mainly useful in test scenario’s or development tasks
115 116 117 |
# File 'lib/sequent/migrations/view_schema.rb', line 115 def replay_all! replay!(Sequent.configuration.online_replay_persistor_class.new) end |