Class: PgSlice::CLI
- Inherits:
-
Thor
- Object
- Thor
- PgSlice::CLI
- Includes:
- Helpers
- Defined in:
- lib/pgslice/cli.rb,
lib/pgslice/cli/fill.rb,
lib/pgslice/cli/prep.rb,
lib/pgslice/cli/swap.rb,
lib/pgslice/cli/unprep.rb,
lib/pgslice/cli/unswap.rb,
lib/pgslice/cli/analyze.rb,
lib/pgslice/cli/add_partitions.rb
Constant Summary
Constants included from Helpers
Class Attribute Summary collapse
-
.instance ⇒ Object
Returns the value of attribute instance.
Class Method Summary collapse
Instance Method Summary collapse
- #add_partitions(table) ⇒ Object
- #analyze(table) ⇒ Object
- #fill(table) ⇒ Object
-
#initialize(*args) ⇒ CLI
constructor
A new instance of CLI.
- #prep(table, column = nil, period = nil) ⇒ Object
- #swap(table) ⇒ Object
- #unprep(table) ⇒ Object
- #unswap(table) ⇒ Object
- #version ⇒ Object
Constructor Details
Class Attribute Details
.instance ⇒ Object
Returns the value of attribute instance.
4 5 6 |
# File 'lib/pgslice/cli.rb', line 4 def instance @instance end |
Class Method Details
.exit_on_failure? ⇒ Boolean
16 17 18 |
# File 'lib/pgslice/cli.rb', line 16 def self.exit_on_failure? ENV["PGSLICE_ENV"] != "test" end |
Instance Method Details
#add_partitions(table) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/pgslice/cli/add_partitions.rb', line 8 def add_partitions(table) original_table = create_table(table) table = [:intermediate] ? original_table.intermediate_table : original_table trigger_name = original_table.trigger_name assert_table(table) future = [:future] past = [:past] tablespace = [:tablespace] range = (-1 * past)..future period, field, cast, needs_comment, declarative, version = table.fetch_settings(original_table.trigger_name) unless period = "No settings found: #{table}" = "#{}\nDid you mean to use --intermediate?" unless [:intermediate] abort end queries = [] if needs_comment queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(table)} IS 'column:#{field},period:#{period},cast:#{cast}';" end # today = utc date today = round_date(Time.now.utc.to_date, period) schema_table = if !declarative table elsif [:intermediate] original_table else table.partitions.last end # indexes automatically propagate in Postgres 11+ if version < 3 index_defs = schema_table.index_defs fk_defs = schema_table.foreign_keys else index_defs = [] fk_defs = [] end primary_key = schema_table.primary_key tablespace_str = tablespace.empty? ? "" : " TABLESPACE #{quote_ident(tablespace)}" added_partitions = [] range.each do |n| day = advance_date(today, period, n) partition = Table.new(original_table.schema, "#{original_table.name}_#{day.strftime(name_format(period))}") next if partition.exists? added_partitions << partition if declarative queries << <<-SQL CREATE TABLE #{quote_table(partition)} PARTITION OF #{quote_table(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)})#{tablespace_str}; SQL else queries << <<-SQL CREATE TABLE #{quote_table(partition)} (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)})) INHERITS (#{quote_table(table)})#{tablespace_str}; SQL end queries << "ALTER TABLE #{quote_table(partition)} ADD PRIMARY KEY (#{primary_key.map { |k| quote_ident(k) }.join(", ")});" if primary_key.any? index_defs.each do |index_def| queries << make_index_def(index_def, partition) end fk_defs.each do |fk_def| queries << make_fk_def(fk_def, partition) end end unless declarative # update trigger based on existing partitions current_defs = [] future_defs = [] past_defs = [] name_format = self.name_format(period) partitions = (table.partitions + added_partitions).uniq(&:name).sort_by(&:name) partitions.each do |partition| day = partition_date(partition, name_format) # note: does not support generated columns # could support by listing columns # but this would cause issues with schema changes sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN INSERT INTO #{quote_table(partition)} VALUES (NEW.*);" if day.to_date < today past_defs << sql elsif advance_date(day, period, 1) < today current_defs << sql else future_defs << sql end end # order by current period, future periods asc, past periods desc trigger_defs = current_defs + future_defs + past_defs.reverse if trigger_defs.any? queries << <<-SQL CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN IF #{trigger_defs.join("\n ELSIF ")} ELSE RAISE EXCEPTION 'Date out of range. Ensure partitions are created.'; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; SQL end end run_queries(queries) if queries.any? end |
#analyze(table) ⇒ Object
5 6 7 8 9 10 11 |
# File 'lib/pgslice/cli/analyze.rb', line 5 def analyze(table) table = create_table(table) parent_table = [:swapped] ? table : table.intermediate_table analyze_list = parent_table.partitions + [parent_table] run_queries_without_transaction(analyze_list.map { |t| "ANALYZE VERBOSE #{quote_table(t)};" }) end |
#fill(table) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/pgslice/cli/fill.rb', line 11 def fill(table) table = create_table(table) source_table = create_table([:source_table]) if [:source_table] dest_table = create_table([:dest_table]) if [:dest_table] if [:swapped] source_table ||= table.retired_table dest_table ||= table else source_table ||= table dest_table ||= table.intermediate_table end assert_table(source_table) assert_table(dest_table) period, field, cast, _, declarative, _ = dest_table.fetch_settings(table.trigger_name) if period name_format = self.name_format(period) partitions = dest_table.partitions if partitions.any? starting_time = partition_date(partitions.first, name_format) ending_time = advance_date(partition_date(partitions.last, name_format), period, 1) end end schema_table = period && declarative ? partitions.last : table primary_key = schema_table.primary_key[0] abort "No primary key" unless primary_key max_source_id = nil begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction abort "Only numeric primary keys are supported" end max_dest_id = if [:start] [:start] elsif [:swapped] dest_table.max_id(primary_key, where: [:where], below: max_source_id) else dest_table.max_id(primary_key, where: [:where]) end if max_dest_id == 0 && ![:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, [:where]) max_dest_id = min_source_id - 1 if min_source_id end starting_id = max_dest_id fields = source_table.columns.map { |c| quote_ident(c) }.join(", ") batch_size = [:batch_size] i = 1 batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil if batch_count == 0 log_sql "/* nothing to fill */" end while starting_id < max_source_id where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}" if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end if [:where] where << " AND #{[:where]}" end query = <<-SQL /* #{i} of #{batch_count} */ INSERT INTO #{quote_table(dest_table)} (#{fields}) SELECT #{fields} FROM #{quote_table(source_table)} WHERE #{where} SQL run_query(query) starting_id += batch_size i += 1 if [:sleep] && starting_id <= max_source_id sleep([:sleep]) end end end |
#prep(table, column = nil, period = nil) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/pgslice/cli/prep.rb', line 7 def prep(table, column=nil, period=nil) table = create_table(table) intermediate_table = table.intermediate_table trigger_name = table.trigger_name unless [:partition] abort "Usage: \"pgslice prep TABLE --no-partition\"" if column || period abort "Can't use --trigger-based and --no-partition" if [:trigger_based] end assert_table(table) assert_no_table(intermediate_table) if [:partition] abort "Usage: \"pgslice prep TABLE COLUMN PERIOD\"" if !(column && period) abort "Column not found: #{column}" unless table.columns.include?(column) abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] end queries = [] # version summary # 1. trigger-based (pg9) # 2. declarative, with indexes and foreign keys on child tables (pg10) # 3. declarative, with indexes and foreign keys on parent table (pg11+) version = [:test_version] || ([:trigger_based] ? 1 : 3) declarative = version > 1 if declarative && [:partition] including = ["DEFAULTS", "CONSTRAINTS", "STORAGE", "COMMENTS", "STATISTICS"] if server_version_num >= 120000 including << "GENERATED" end if server_version_num >= 140000 including << "COMPRESSION" end queries << <<-SQL CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} #{including.map { |v| "INCLUDING #{v}" }.join(" ")}) PARTITION BY RANGE (#{quote_ident(column)}); SQL if version == 3 index_defs = table.index_defs index_defs.each do |index_def| queries << make_index_def(index_def, intermediate_table) end table.foreign_keys.each do |fk_def| queries << make_fk_def(fk_def, intermediate_table) end end # add comment cast = table.column_cast(column) queries << <<-SQL COMMENT ON TABLE #{quote_table(intermediate_table)} IS 'column:#{column},period:#{period},cast:#{cast},version:#{version}'; SQL else queries << <<-SQL CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING ALL); SQL table.foreign_keys.each do |fk_def| queries << make_fk_def(fk_def, intermediate_table) end end if [:partition] && !declarative queries << <<-SQL CREATE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN RAISE EXCEPTION 'Create partitions first.'; END; $$ LANGUAGE plpgsql; SQL queries << <<-SQL CREATE TRIGGER #{quote_ident(trigger_name)} BEFORE INSERT ON #{quote_table(intermediate_table)} FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}(); SQL cast = table.column_cast(column) queries << <<-SQL COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(intermediate_table)} IS 'column:#{column},period:#{period},cast:#{cast}'; SQL end run_queries(queries) end |
#swap(table) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pgslice/cli/swap.rb', line 5 def swap(table) table = create_table(table) intermediate_table = table.intermediate_table retired_table = table.retired_table assert_table(table) assert_table(intermediate_table) assert_no_table(retired_table) queries = [ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};", "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};" ] table.sequences.each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};" end queries.unshift("SET LOCAL lock_timeout = #{escape_literal([:lock_timeout])};") run_queries(queries) end |
#unprep(table) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/pgslice/cli/unprep.rb', line 4 def unprep(table) table = create_table(table) intermediate_table = table.intermediate_table trigger_name = table.trigger_name assert_table(intermediate_table) queries = [ "DROP TABLE #{quote_table(intermediate_table)} CASCADE;", "DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();" ] run_queries(queries) end |
#unswap(table) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pgslice/cli/unswap.rb', line 4 def unswap(table) table = create_table(table) intermediate_table = table.intermediate_table retired_table = table.retired_table assert_table(table) assert_table(retired_table) assert_no_table(intermediate_table) queries = [ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};", "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};" ] table.sequences.each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};" end run_queries(queries) end |