Class: PgOnlineSchemaChange::Query
- Inherits:
-
Object
- Object
- PgOnlineSchemaChange::Query
show all
- Extended by:
- Helper
- Defined in:
- lib/pg_online_schema_change/query.rb
Constant Summary
collapse
- INDEX_SUFFIX =
"_pgosc"
- DROPPED_COLUMN_TYPE =
:AT_DropColumn
- RENAMED_COLUMN_TYPE =
:AT_RenameColumn
- LOCK_ATTEMPT =
4
Class Method Summary
collapse
-
.alter_statement?(query) ⇒ Boolean
-
.alter_statement_for(client, shadow_table) ⇒ Object
-
.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object
-
.dropped_columns(client) ⇒ Object
-
.get_all_constraints_for(client) ⇒ Object
-
.get_foreign_keys_for(client, table) ⇒ Object
-
.get_foreign_keys_to_validate(client, table) ⇒ Object
-
.get_indexes_for(client, table) ⇒ Object
-
.get_primary_keys_for(client, table) ⇒ Object
-
.kill_backends(client, table) ⇒ Object
-
.open_lock_exclusive(client, table) ⇒ Object
This function acquires the lock and keeps the transaction open.
-
.primary_key_for(client, table) ⇒ Object
-
.referential_foreign_keys_to_refresh(client, table) ⇒ Object
-
.renamed_columns(client) ⇒ Object
-
.run(connection, query, reuse_trasaction = false, &block) ⇒ Object
-
.same_table?(query) ⇒ Boolean
-
.self_foreign_keys_to_refresh(client, table) ⇒ Object
-
.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object
-
.table(query) ⇒ Object
-
.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object
Methods included from Helper
logger, method_missing, primary_key, respond_to_missing?
Class Method Details
.alter_statement?(query) ⇒ Boolean
16
17
18
19
20
21
22
|
# File 'lib/pg_online_schema_change/query.rb', line 16
def alter_statement?(query)
PgQuery.parse(query).tree.stmts.all? do |statement|
statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt) || statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
end
rescue PgQuery::ParseError
false
end
|
.alter_statement_for(client, shadow_table) ⇒ Object
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/pg_online_schema_change/query.rb', line 85
def alter_statement_for(client, shadow_table)
parsed_query = PgQuery.parse(client.alter_statement)
parsed_query.tree.stmts.each do |statement|
statement.stmt.alter_table_stmt.relation.relname = shadow_table if statement.stmt.alter_table_stmt
statement.stmt.rename_stmt.relation.relname = shadow_table if statement.stmt.rename_stmt
end
parsed_query.deparse
end
|
.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
# File 'lib/pg_online_schema_change/query.rb', line 291
def copy_data_statement(client, shadow_table, reuse_trasaction = false)
select_columns = table_columns(client, client.table, reuse_trasaction).map do |entry|
entry["column_name_regular"]
end
select_columns -= dropped_columns_list if dropped_columns_list.any?
insert_into_columns = select_columns.dup
if renamed_columns_list.any?
renamed_columns_list.each do |obj|
insert_into_columns.each_with_index do |insert_into_column, index|
insert_into_columns[index] = obj[:new_name] if insert_into_column == obj[:old_name]
end
end
end
insert_into_columns.map! do |insert_into_column|
client.connection.quote_ident(insert_into_column)
end
select_columns.map! do |select_column|
client.connection.quote_ident(select_column)
end
<<~SQL
INSERT INTO #{shadow_table}(#{insert_into_columns.join(", ")})
SELECT #{select_columns.join(", ")}
FROM ONLY #{client.table}
SQL
end
|
.dropped_columns(client) ⇒ Object
191
192
193
194
195
196
197
198
199
|
# File 'lib/pg_online_schema_change/query.rb', line 191
def dropped_columns(client)
PgQuery.parse(client.alter_statement).tree.stmts.map do |statement|
next if statement.stmt.alter_table_stmt.nil?
statement.stmt.alter_table_stmt.cmds.map do |cmd|
cmd.alter_table_cmd.name if cmd.alter_table_cmd.subtype == DROPPED_COLUMN_TYPE
end
end.flatten.compact
end
|
.get_all_constraints_for(client) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/pg_online_schema_change/query.rb', line 111
def get_all_constraints_for(client)
query = <<~SQL
SELECT conrelid::regclass AS table_on,
confrelid::regclass AS table_from,
contype as constraint_type,
conname AS constraint_name,
convalidated AS constraint_validated,
pg_get_constraintdef(oid) AS definition
FROM pg_constraint
WHERE contype IN ('f', 'p')
SQL
constraints = []
run(client.connection, query) do |result|
constraints = result.map { |row| row }
end
constraints
end
|
.get_foreign_keys_for(client, table) ⇒ Object
137
138
139
140
141
|
# File 'lib/pg_online_schema_change/query.rb', line 137
def get_foreign_keys_for(client, table)
get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "f"
end
end
|
.get_foreign_keys_to_validate(client, table) ⇒ Object
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/pg_online_schema_change/query.rb', line 176
def get_foreign_keys_to_validate(client, table)
constraints = get_all_constraints_for(client)
referential_foreign_keys = constraints.select do |row|
row["table_from"] == table && row["constraint_type"] == "f"
end
self_foreign_keys = constraints.select do |row|
row["table_on"] == table && row["constraint_type"] == "f"
end
[referential_foreign_keys, self_foreign_keys].flatten.map do |row|
"ALTER TABLE #{row["table_on"]} VALIDATE CONSTRAINT #{row["constraint_name"]};"
end.join
end
|
.get_indexes_for(client, table) ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/pg_online_schema_change/query.rb', line 96
def get_indexes_for(client, table)
query = <<~SQL
SELECT indexdef, schemaname
FROM pg_indexes
WHERE schemaname = \'#{client.schema}\' AND tablename = \'#{table}\'
SQL
indexes = []
run(client.connection, query) do |result|
indexes = result.map { |row| row["indexdef"] }
end
indexes
end
|
.get_primary_keys_for(client, table) ⇒ Object
131
132
133
134
135
|
# File 'lib/pg_online_schema_change/query.rb', line 131
def get_primary_keys_for(client, table)
get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "p"
end
end
|
.kill_backends(client, table) ⇒ Object
279
280
281
282
283
284
285
286
287
288
289
|
# File 'lib/pg_online_schema_change/query.rb', line 279
def kill_backends(client, table)
return unless client.kill_backends
logger.info("Terminating other backends")
query = <<~SQL
SELECT pg_terminate_backend(pid) FROM pg_locks WHERE locktype = 'relation' AND relation = \'#{table}\'::regclass::oid AND pid <> pg_backend_pid()
SQL
run(client.connection, query, true)
end
|
.open_lock_exclusive(client, table) ⇒ Object
This function acquires the lock and keeps the transaction open. If a lock is acquired, its upon the caller to call COMMIT to end the transaction. If a lock is not acquired, transaction is closed and a new transaction is started to acquire lock again
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
# File 'lib/pg_online_schema_change/query.rb', line 253
def open_lock_exclusive(client, table)
attempts ||= 1
query = <<~SQL
SET lock_timeout = '#{client.wait_time_for_lock}s';
LOCK TABLE #{client.table} IN ACCESS EXCLUSIVE MODE;
SQL
run(client.connection, query, true)
true
rescue PG::LockNotAvailable, PG::InFailedSqlTransaction
if (attempts += 1) < LOCK_ATTEMPT
logger.info("Couldn't acquire lock, attempt: #{attempts}")
run(client.connection, "RESET lock_timeout;")
kill_backends(client, table)
retry
end
logger.info("Lock acquire failed")
run(client.connection, "RESET lock_timeout;")
false
end
|
.primary_key_for(client, table) ⇒ Object
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
# File 'lib/pg_online_schema_change/query.rb', line 212
def primary_key_for(client, table)
query = <<~SQL
SELECT
pg_attribute.attname as column_name
FROM pg_index, pg_class, pg_attribute, pg_namespace
WHERE
pg_class.oid = \'#{table}\'::regclass AND
indrelid = pg_class.oid AND
nspname = \'#{client.schema}\' AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary
SQL
columns = []
run(client.connection, query) do |result|
columns = result.map { |row| row["column_name"] }
end
columns.first
end
|
.referential_foreign_keys_to_refresh(client, table) ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
# File 'lib/pg_online_schema_change/query.rb', line 143
def referential_foreign_keys_to_refresh(client, table)
references = get_all_constraints_for(client).select do |row|
row["table_from"] == table && row["constraint_type"] == "f"
end
references.map do |row|
add_statement = if row["definition"].end_with?("NOT VALID")
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
else
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
end
drop_statement = "ALTER TABLE #{row["table_on"]} DROP CONSTRAINT #{row["constraint_name"]};"
"#{drop_statement} #{add_statement}"
end.join
end
|
.renamed_columns(client) ⇒ Object
201
202
203
204
205
206
207
208
209
210
|
# File 'lib/pg_online_schema_change/query.rb', line 201
def renamed_columns(client)
PgQuery.parse(client.alter_statement).tree.stmts.map do |statement|
next if statement.stmt.rename_stmt.nil?
{
old_name: statement.stmt.rename_stmt.subname,
new_name: statement.stmt.rename_stmt.newname,
}
end.flatten.compact
end
|
.run(connection, query, reuse_trasaction = false, &block) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/pg_online_schema_change/query.rb', line 45
def run(connection, query, reuse_trasaction = false, &block)
connection.cancel if [PG::PQTRANS_INERROR, PG::PQTRANS_UNKNOWN].include?(connection.transaction_status)
logger.debug("Running query", { query: query })
connection.async_exec("BEGIN;")
result = connection.async_exec(query, &block)
rescue Exception connection.cancel if connection.transaction_status != PG::PQTRANS_IDLE
connection.block
logger.info("Exception raised, rolling back query", { rollback: true, query: query })
connection.async_exec("ROLLBACK;")
connection.async_exec("COMMIT;")
raise
else
connection.async_exec("COMMIT;") unless reuse_trasaction
result
end
|
.same_table?(query) ⇒ Boolean
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/pg_online_schema_change/query.rb', line 24
def same_table?(query)
tables = PgQuery.parse(query).tree.stmts.filter_map do |statement|
if statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt)
statement.stmt.alter_table_stmt.relation.relname
elsif statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
statement.stmt.rename_stmt.relation.relname
end
end
tables.uniq.count == 1
rescue PgQuery::ParseError
false
end
|
.self_foreign_keys_to_refresh(client, table) ⇒ Object
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# File 'lib/pg_online_schema_change/query.rb', line 161
def self_foreign_keys_to_refresh(client, table)
references = get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "f"
end
references.map do |row|
add_statement = if row["definition"].end_with?("NOT VALID")
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
else
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
end
add_statement
end.join
end
|
.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object
235
236
237
238
239
240
241
242
243
244
245
246
|
# File 'lib/pg_online_schema_change/query.rb', line 235
def storage_parameters_for(client, table, reuse_trasaction = false)
query = <<~SQL
SELECT array_to_string(reloptions, ',') as params FROM pg_class WHERE relname=\'#{table}\';
SQL
columns = []
run(client.connection, query, reuse_trasaction) do |result|
columns = result.map { |row| row["params"] }
end
columns.first
end
|
.table(query) ⇒ Object
38
39
40
41
42
43
|
# File 'lib/pg_online_schema_change/query.rb', line 38
def table(query)
from_rename_statement = PgQuery.parse(query).tree.stmts.filter_map do |statement|
statement.stmt.rename_stmt&.relation&.relname
end[0]
PgQuery.parse(query).tables[0] || from_rename_statement
end
|
.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/pg_online_schema_change/query.rb', line 65
def table_columns(client, table = nil, reuse_trasaction = false)
sql = <<~SQL
SELECT attname as column_name, format_type(atttypid, atttypmod) as type, attnum as column_position FROM pg_attribute
WHERE attrelid = \'#{table || client.table}\'::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum;
SQL
mapped_columns = []
run(client.connection, sql, reuse_trasaction) do |result|
mapped_columns = result.map do |row|
row["column_name_regular"] = row["column_name"]
row["column_name"] = client.connection.quote_ident(row["column_name"])
row["column_position"] = row["column_position"].to_i
row
end
end
mapped_columns
end
|