Class: PgOnlineSchemaChange::Query

Inherits:
Object
  • Object
show all
Extended by:
Helper
Defined in:
lib/pg_online_schema_change/query.rb

Constant Summary collapse

INDEX_SUFFIX =
"_pgosc"
DROPPED_COLUMN_TYPE =
:AT_DropColumn
RENAMED_COLUMN_TYPE =
:AT_RenameColumn
LOCK_ATTEMPT =
4

Class Method Summary collapse

Methods included from Helper

logger, method_missing, primary_key, respond_to_missing?

Class Method Details

.alter_statement?(query) ⇒ Boolean

Returns:

  • (Boolean)


16
17
18
19
20
21
22
# File 'lib/pg_online_schema_change/query.rb', line 16

def alter_statement?(query)
  PgQuery.parse(query).tree.stmts.all? do |statement|
    statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt) || statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
  end
rescue PgQuery::ParseError
  false
end

.alter_statement_for(client, shadow_table) ⇒ Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/pg_online_schema_change/query.rb', line 85

def alter_statement_for(client, shadow_table)
  parsed_query = PgQuery.parse(client.alter_statement)

  parsed_query.tree.stmts.each do |statement|
    statement.stmt.alter_table_stmt.relation.relname = shadow_table if statement.stmt.alter_table_stmt

    statement.stmt.rename_stmt.relation.relname = shadow_table if statement.stmt.rename_stmt
  end
  parsed_query.deparse
end

.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/pg_online_schema_change/query.rb', line 291

def copy_data_statement(client, shadow_table, reuse_trasaction = false)
  select_columns = table_columns(client, client.table, reuse_trasaction).map do |entry|
    entry["column_name_regular"]
  end

  select_columns -= dropped_columns_list if dropped_columns_list.any?

  insert_into_columns = select_columns.dup

  if renamed_columns_list.any?
    renamed_columns_list.each do |obj|
      insert_into_columns.each_with_index do |insert_into_column, index|
        insert_into_columns[index] = obj[:new_name] if insert_into_column == obj[:old_name]
      end
    end
  end

  insert_into_columns.map! do |insert_into_column|
    client.connection.quote_ident(insert_into_column)
  end

  select_columns.map! do |select_column|
    client.connection.quote_ident(select_column)
  end

  <<~SQL
    INSERT INTO #{shadow_table}(#{insert_into_columns.join(", ")})
    SELECT #{select_columns.join(", ")}
    FROM ONLY #{client.table}
  SQL
end

.dropped_columns(client) ⇒ Object



191
192
193
194
195
196
197
198
199
# File 'lib/pg_online_schema_change/query.rb', line 191

def dropped_columns(client)
  PgQuery.parse(client.alter_statement).tree.stmts.map do |statement|
    next if statement.stmt.alter_table_stmt.nil?

    statement.stmt.alter_table_stmt.cmds.map do |cmd|
      cmd.alter_table_cmd.name if cmd.alter_table_cmd.subtype == DROPPED_COLUMN_TYPE
    end
  end.flatten.compact
end

.get_all_constraints_for(client) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/pg_online_schema_change/query.rb', line 111

def get_all_constraints_for(client)
  query = <<~SQL
    SELECT  conrelid::regclass AS table_on,
            confrelid::regclass AS table_from,
            contype as constraint_type,
            conname AS constraint_name,
            convalidated AS constraint_validated,
            pg_get_constraintdef(oid) AS definition
    FROM   	pg_constraint
    WHERE  	contype IN ('f', 'p')
  SQL

  constraints = []
  run(client.connection, query) do |result|
    constraints = result.map { |row| row }
  end

  constraints
end

.get_foreign_keys_for(client, table) ⇒ Object



137
138
139
140
141
# File 'lib/pg_online_schema_change/query.rb', line 137

def get_foreign_keys_for(client, table)
  get_all_constraints_for(client).select do |row|
    row["table_on"] == table && row["constraint_type"] == "f"
  end
end

.get_foreign_keys_to_validate(client, table) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/pg_online_schema_change/query.rb', line 176

def get_foreign_keys_to_validate(client, table)
  constraints = get_all_constraints_for(client)
  referential_foreign_keys = constraints.select do |row|
    row["table_from"] == table && row["constraint_type"] == "f"
  end

  self_foreign_keys = constraints.select do |row|
    row["table_on"] == table && row["constraint_type"] == "f"
  end

  [referential_foreign_keys, self_foreign_keys].flatten.map do |row|
    "ALTER TABLE #{row["table_on"]} VALIDATE CONSTRAINT #{row["constraint_name"]};"
  end.join
end

.get_indexes_for(client, table) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/pg_online_schema_change/query.rb', line 96

def get_indexes_for(client, table)
  query = <<~SQL
    SELECT indexdef, schemaname
    FROM pg_indexes
    WHERE schemaname = \'#{client.schema}\' AND tablename = \'#{table}\'
  SQL

  indexes = []
  run(client.connection, query) do |result|
    indexes = result.map { |row| row["indexdef"] }
  end

  indexes
end

.get_primary_keys_for(client, table) ⇒ Object



131
132
133
134
135
# File 'lib/pg_online_schema_change/query.rb', line 131

def get_primary_keys_for(client, table)
  get_all_constraints_for(client).select do |row|
    row["table_on"] == table && row["constraint_type"] == "p"
  end
end

.kill_backends(client, table) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
# File 'lib/pg_online_schema_change/query.rb', line 279

def kill_backends(client, table)
  return unless client.kill_backends

  logger.info("Terminating other backends")

  query = <<~SQL
    SELECT pg_terminate_backend(pid) FROM pg_locks WHERE locktype = 'relation' AND relation = \'#{table}\'::regclass::oid AND pid <> pg_backend_pid()
  SQL

  run(client.connection, query, true)
end

.open_lock_exclusive(client, table) ⇒ Object

This function acquires the lock and keeps the transaction open. If a lock is acquired, its upon the caller to call COMMIT to end the transaction. If a lock is not acquired, transaction is closed and a new transaction is started to acquire lock again



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/pg_online_schema_change/query.rb', line 253

def open_lock_exclusive(client, table)
  attempts ||= 1

  query = <<~SQL
    SET lock_timeout = '#{client.wait_time_for_lock}s';
    LOCK TABLE #{client.table} IN ACCESS EXCLUSIVE MODE;
  SQL
  run(client.connection, query, true)

  true
rescue PG::LockNotAvailable, PG::InFailedSqlTransaction
  if (attempts += 1) < LOCK_ATTEMPT
    logger.info("Couldn't acquire lock, attempt: #{attempts}")

    run(client.connection, "RESET lock_timeout;")
    kill_backends(client, table)

    retry
  end

  logger.info("Lock acquire failed")
  run(client.connection, "RESET lock_timeout;")

  false
end

.primary_key_for(client, table) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/pg_online_schema_change/query.rb', line 212

def primary_key_for(client, table)
  query = <<~SQL
    SELECT
      pg_attribute.attname as column_name
    FROM pg_index, pg_class, pg_attribute, pg_namespace
    WHERE
      pg_class.oid = \'#{table}\'::regclass AND
      indrelid = pg_class.oid AND
      nspname = \'#{client.schema}\' AND
      pg_class.relnamespace = pg_namespace.oid AND
      pg_attribute.attrelid = pg_class.oid AND
      pg_attribute.attnum = any(pg_index.indkey)
    AND indisprimary
  SQL

  columns = []
  run(client.connection, query) do |result|
    columns = result.map { |row| row["column_name"] }
  end

  columns.first
end

.referential_foreign_keys_to_refresh(client, table) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/pg_online_schema_change/query.rb', line 143

def referential_foreign_keys_to_refresh(client, table)
  references = get_all_constraints_for(client).select do |row|
    row["table_from"] == table && row["constraint_type"] == "f"
  end

  references.map do |row|
    add_statement = if row["definition"].end_with?("NOT VALID")
                      "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
                    else
                      "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
                    end

    drop_statement = "ALTER TABLE #{row["table_on"]} DROP CONSTRAINT #{row["constraint_name"]};"

    "#{drop_statement} #{add_statement}"
  end.join
end

.renamed_columns(client) ⇒ Object



201
202
203
204
205
206
207
208
209
210
# File 'lib/pg_online_schema_change/query.rb', line 201

def renamed_columns(client)
  PgQuery.parse(client.alter_statement).tree.stmts.map do |statement|
    next if statement.stmt.rename_stmt.nil?

    {
      old_name: statement.stmt.rename_stmt.subname,
      new_name: statement.stmt.rename_stmt.newname,
    }
  end.flatten.compact
end

.run(connection, query, reuse_trasaction = false, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pg_online_schema_change/query.rb', line 45

def run(connection, query, reuse_trasaction = false, &block)
  connection.cancel if [PG::PQTRANS_INERROR, PG::PQTRANS_UNKNOWN].include?(connection.transaction_status)

  logger.debug("Running query", { query: query })

  connection.async_exec("BEGIN;")

  result = connection.async_exec(query, &block)
rescue Exception # rubocop:disable Lint/RescueException
  connection.cancel if connection.transaction_status != PG::PQTRANS_IDLE
  connection.block
  logger.info("Exception raised, rolling back query", { rollback: true, query: query })
  connection.async_exec("ROLLBACK;")
  connection.async_exec("COMMIT;")
  raise
else
  connection.async_exec("COMMIT;") unless reuse_trasaction
  result
end

.same_table?(query) ⇒ Boolean

Returns:

  • (Boolean)


24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pg_online_schema_change/query.rb', line 24

def same_table?(query)
  tables = PgQuery.parse(query).tree.stmts.filter_map do |statement|
    if statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt)
      statement.stmt.alter_table_stmt.relation.relname
    elsif statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
      statement.stmt.rename_stmt.relation.relname
    end
  end

  tables.uniq.count == 1
rescue PgQuery::ParseError
  false
end

.self_foreign_keys_to_refresh(client, table) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/pg_online_schema_change/query.rb', line 161

def self_foreign_keys_to_refresh(client, table)
  references = get_all_constraints_for(client).select do |row|
    row["table_on"] == table && row["constraint_type"] == "f"
  end

  references.map do |row|
    add_statement = if row["definition"].end_with?("NOT VALID")
                      "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
                    else
                      "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
                    end
    add_statement
  end.join
end

.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/pg_online_schema_change/query.rb', line 235

def storage_parameters_for(client, table, reuse_trasaction = false)
  query = <<~SQL
    SELECT array_to_string(reloptions, ',') as params FROM pg_class WHERE relname=\'#{table}\';
  SQL

  columns = []
  run(client.connection, query, reuse_trasaction) do |result|
    columns = result.map { |row| row["params"] }
  end

  columns.first
end

.table(query) ⇒ Object



38
39
40
41
42
43
# File 'lib/pg_online_schema_change/query.rb', line 38

def table(query)
  from_rename_statement = PgQuery.parse(query).tree.stmts.filter_map do |statement|
    statement.stmt.rename_stmt&.relation&.relname
  end[0]
  PgQuery.parse(query).tables[0] || from_rename_statement
end

.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pg_online_schema_change/query.rb', line 65

def table_columns(client, table = nil, reuse_trasaction = false)
  sql = <<~SQL
    SELECT attname as column_name, format_type(atttypid, atttypmod) as type, attnum as column_position FROM   pg_attribute
    WHERE  attrelid = \'#{table || client.table}\'::regclass AND attnum > 0 AND NOT attisdropped
    ORDER  BY attnum;
  SQL
  mapped_columns = []

  run(client.connection, sql, reuse_trasaction) do |result|
    mapped_columns = result.map do |row|
      row["column_name_regular"] = row["column_name"]
      row["column_name"] = client.connection.quote_ident(row["column_name"])
      row["column_position"] = row["column_position"].to_i
      row
    end
  end

  mapped_columns
end