Class: PgSync::TableSync
- Inherits:
-
Object
- Object
- PgSync::TableSync
- Includes:
- Utils
- Defined in:
- lib/pgsync/table_sync.rb
Instance Method Summary collapse
Methods included from Utils
#colorize, #config_file, #db_config_file, #log, #search_tree
Instance Method Details
#sync(config, table, opts, source_url, destination_url) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/pgsync/table_sync.rb', line 5 def sync(config, table, opts, source_url, destination_url) start_time = Time.now source = DataSource.new(source_url, timeout: 0) destination = DataSource.new(destination_url, timeout: 0) begin from_connection = source.conn to_connection = destination.conn bad_fields = opts[:no_rules] ? [] : config["data_rules"] from_fields = source.columns(table) to_fields = destination.columns(table) shared_fields = to_fields & from_fields extra_fields = to_fields - from_fields missing_fields = from_fields - to_fields if opts[:no_sequences] from_sequences = [] to_sequences = [] else from_sequences = source.sequences(table, shared_fields) to_sequences = destination.sequences(table, shared_fields) end shared_sequences = to_sequences & from_sequences extra_sequences = to_sequences - from_sequences missing_sequences = from_sequences - to_sequences sql_clause = String.new if opts[:sql] sql_clause << " #{opts[:sql]}" end notes = [] notes << "Extra columns: #{extra_fields.join(", ")}" if extra_fields.any? notes << "Missing columns: #{missing_fields.join(", ")}" if missing_fields.any? notes << "Extra sequences: #{extra_sequences.join(", ")}" if extra_sequences.any? notes << "Missing sequences: #{missing_sequences.join(", ")}" if missing_sequences.any? if shared_fields.empty? return {status: "success", message: "No fields to copy"} end if shared_fields.any? primary_key = destination.primary_key(table) copy_fields = shared_fields.map { |f| f2 = bad_fields.to_a.find { |bf, _| rule_match?(table, f, bf) }; f2 ? "#{apply_strategy(f2[1], table, f, primary_key)} AS #{quote_ident(f)}" : "#{quote_ident_full(table)}.#{quote_ident(f)}" }.join(", ") fields = shared_fields.map { |f| quote_ident(f) }.join(", ") seq_values = {} shared_sequences.each do |seq| seq_values[seq] = source.last_value(seq) end copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quote_ident_full(table)}#{sql_clause}) TO STDOUT" if opts[:in_batches] raise Error, "Cannot use --overwrite with --in-batches" if opts[:overwrite] raise Error, "No primary key" unless primary_key destination.truncate(table) if opts[:truncate] from_max_id = source.max_id(table, primary_key) to_max_id = destination.max_id(table, primary_key) + 1 if to_max_id == 1 from_min_id = source.min_id(table, primary_key) to_max_id = from_min_id if from_min_id > 0 end starting_id = to_max_id batch_size = opts[:batch_size] i = 1 batch_count = ((from_max_id - starting_id + 1) / batch_size.to_f).ceil while starting_id <= from_max_id where = "#{quote_ident(primary_key)} >= #{starting_id} AND #{quote_ident(primary_key)} < #{starting_id + batch_size}" log " #{i}/#{batch_count}: #{where}" # TODO be smarter for advance sql clauses batch_sql_clause = " #{sql_clause.length > 0 ? "#{sql_clause} AND" : "WHERE"} #{where}" batch_copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quote_ident_full(table)}#{batch_sql_clause}) TO STDOUT" to_connection.copy_data "COPY #{quote_ident_full(table)} (#{fields}) FROM STDIN" do from_connection.copy_data batch_copy_to_command do while (row = from_connection.get_copy_data) to_connection.put_copy_data(row) end end end starting_id += batch_size i += 1 if opts[:sleep] && starting_id <= from_max_id sleep(opts[:sleep]) end end log # add extra line for spinner elsif !opts[:truncate] && (opts[:overwrite] || opts[:preserve] || !sql_clause.empty?) raise Error, "No primary key" unless primary_key temp_table = "pgsync_#{rand(1_000_000_000)}" file = Tempfile.new(temp_table) begin from_connection.copy_data copy_to_command do while (row = from_connection.get_copy_data) file.write(row) end end file.rewind # create a temp table to_connection.exec("CREATE TEMPORARY TABLE #{quote_ident_full(temp_table)} AS SELECT * FROM #{quote_ident_full(table)} WITH NO DATA") # load file to_connection.copy_data "COPY #{quote_ident_full(temp_table)} (#{fields}) FROM STDIN" do file.each do |row| to_connection.put_copy_data(row) end end if opts[:preserve] # insert into to_connection.exec("INSERT INTO #{quote_ident_full(table)} (SELECT * FROM #{quote_ident_full(temp_table)} WHERE NOT EXISTS (SELECT 1 FROM #{quote_ident_full(table)} WHERE #{quote_ident_full(table)}.#{quote_ident(primary_key)} = #{quote_ident_full(temp_table)}.#{quote_ident(primary_key)}))") else to_connection.transaction do to_connection.exec("DELETE FROM #{quote_ident_full(table)} WHERE #{quote_ident(primary_key)} IN (SELECT #{quote_ident(primary_key)} FROM #{quote_ident_full(temp_table)})") to_connection.exec("INSERT INTO #{quote_ident_full(table)} (SELECT * FROM #{quote_ident(temp_table)})") end end ensure file.close file.unlink end else destination.truncate(table) to_connection.copy_data "COPY #{quote_ident_full(table)} (#{fields}) FROM STDIN" do from_connection.copy_data copy_to_command do while (row = from_connection.get_copy_data) to_connection.put_copy_data(row) end end end end seq_values.each do |seq, value| to_connection.exec("SELECT setval(#{escape(seq)}, #{escape(value)})") end end = nil if notes.any? = notes.join(", ") end {status: "success", message: , time: (Time.now - start_time).round(1)} ensure source.close destination.close end rescue => e = case e when PG::Error # likely fine to show simplified message here # the full message will be shown when first trying to connect "Connection failed" when Error e. else "#{e.class.name}: #{e.}" end {status: "error", message: } end |