Class: PgOnlineSchemaChange::Replay
- Inherits:
-
Object
- Object
- PgOnlineSchemaChange::Replay
- Extended by:
- Helper
- Defined in:
- lib/pg_online_schema_change/replay.rb
Class Method Summary collapse
-
.begin! ⇒ Object
This, picks PULL_BATCH_COUNT rows by primary key from audit_table, replays it on the shadow_table.
- .play!(rows, reuse_trasaction = false) ⇒ Object
- .reserved_columns ⇒ Object
- .rows_to_play(reuse_trasaction = false) ⇒ Object
Methods included from Helper
logger, method_missing, primary_key, respond_to_missing?
Class Method Details
.begin! ⇒ Object
This, picks PULL_BATCH_COUNT rows by primary key from audit_table, replays it on the shadow_table. Once the batch is done, it them deletes those PULL_BATCH_COUNT rows from audit_table. Then, pull another batch, check if the row count matches PULL_BATCH_COUNT, if so swap, otherwise continue. Swap because, the row count is minimal to replay them altogether and perform the rename while holding an access exclusive lock for minimal time.
16 17 18 19 20 21 22 23 24 |
# File 'lib/pg_online_schema_change/replay.rb', line 16 def begin! loop do rows = rows_to_play raise CountBelowDelta if rows.count <= client.delta_count play!(rows) end end |
.play!(rows, reuse_trasaction = false) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/pg_online_schema_change/replay.rb', line 41 def play!(rows, reuse_trasaction = false) logger.info("Replaying rows, count: #{rows.size}") to_be_deleted_rows = [] to_be_replayed = [] rows.each do |row| new_row = row.dup # Remove audit table cols, since we will be # re-mapping them for inserts and updates reserved_columns.each do |col| new_row.delete(col) end if dropped_columns_list.any? dropped_columns_list.each do |dropped_column| new_row.delete(dropped_column) end end if renamed_columns_list.any? renamed_columns_list.each do |object| value = new_row.delete(object[:old_name]) new_row[object[:new_name]] = value end end new_row = new_row.compact # quote indent column to preserve case insensitivity # ensure rows are escaped new_row = new_row.transform_keys do |column| client.connection.quote_ident(column) end new_row = new_row.transform_values do |value| client.connection.escape_string(value) end case row[operation_type_column] when "INSERT" values = new_row.map { |_, val| "'#{val}'" }.join(",") sql = <<~SQL INSERT INTO #{shadow_table} (#{new_row.keys.join(",")}) VALUES (#{values}); SQL to_be_replayed << sql to_be_deleted_rows << "'#{row[audit_table_pk]}'" when "UPDATE" set_values = new_row.map do |column, value| "#{column} = '#{value}'" end.join(",") sql = <<~SQL UPDATE #{shadow_table} SET #{set_values} WHERE #{primary_key}=\'#{row[primary_key]}\'; SQL to_be_replayed << sql to_be_deleted_rows << "'#{row[audit_table_pk]}'" when "DELETE" sql = <<~SQL DELETE FROM #{shadow_table} WHERE #{primary_key}=\'#{row[primary_key]}\'; SQL to_be_replayed << sql to_be_deleted_rows << "'#{row[audit_table_pk]}'" end end Query.run(client.connection, to_be_replayed.join, reuse_trasaction) # Delete items from the audit now that are replayed return unless to_be_deleted_rows.count >= 1 delete_query = <<~SQL DELETE FROM #{audit_table} WHERE #{audit_table_pk} IN (#{to_be_deleted_rows.join(",")}) SQL Query.run(client.connection, delete_query, reuse_trasaction) end |
.reserved_columns ⇒ Object
37 38 39 |
# File 'lib/pg_online_schema_change/replay.rb', line 37 def reserved_columns @reserved_columns ||= [trigger_time_column, operation_type_column, audit_table_pk] end |
.rows_to_play(reuse_trasaction = false) ⇒ Object
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pg_online_schema_change/replay.rb', line 26 def rows_to_play(reuse_trasaction = false) select_query = <<~SQL SELECT * FROM #{audit_table} ORDER BY #{audit_table_pk} LIMIT #{client.pull_batch_count}; SQL rows = [] Query.run(client.connection, select_query, reuse_trasaction) { |result| rows = result.map { |row| row } } rows end |