3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/pgsync/table_sync.rb', line 3
def sync(mutex, config, table, opts, source_url, destination_url, first_schema)
start_time = Time.now
source = DataSource.new(source_url, timeout: 0)
destination = DataSource.new(destination_url, timeout: 0)
begin
from_connection = source.conn
to_connection = destination.conn
bad_fields = opts[:no_rules] ? [] : config["data_rules"]
from_fields = source.columns(table)
to_fields = destination.columns(table)
shared_fields = to_fields & from_fields
= to_fields - from_fields
missing_fields = from_fields - to_fields
from_sequences = source.sequences(table, shared_fields)
to_sequences = destination.sequences(table, shared_fields)
shared_sequences = to_sequences & from_sequences
= to_sequences - from_sequences
missing_sequences = from_sequences - to_sequences
sql_clause = String.new
table_name = table.sub("#{first_schema}.", "")
mutex.synchronize do
log "* Syncing #{table_name}"
if opts[:sql]
log " #{opts[:sql]}"
sql_clause << " #{opts[:sql]}"
end
log " Extra columns: #{extra_fields.join(", ")}" if .any?
log " Missing columns: #{missing_fields.join(", ")}" if missing_fields.any?
log " Extra sequences: #{extra_sequences.join(", ")}" if .any?
log " Missing sequences: #{missing_sequences.join(", ")}" if missing_sequences.any?
if shared_fields.empty?
log " No fields to copy"
end
end
if shared_fields.any?
primary_key = destination.primary_key(table)
copy_fields = shared_fields.map { |f| f2 = bad_fields.to_a.find { |bf, _| rule_match?(table, f, bf) }; f2 ? "#{apply_strategy(f2[1], table, f, primary_key)} AS #{quote_ident(f)}" : "#{quote_ident_full(table)}.#{quote_ident(f)}" }.join(", ")
fields = shared_fields.map { |f| quote_ident(f) }.join(", ")
seq_values = {}
shared_sequences.each do |seq|
seq_values[seq] = source.last_value(seq)
end
copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quote_ident_full(table)}#{sql_clause}) TO STDOUT"
if opts[:in_batches]
raise PgSync::Error, "Cannot use --overwrite with --in-batches" if opts[:overwrite]
raise PgSync::Error, "No primary key" unless primary_key
destination.truncate(table) if opts[:truncate]
from_max_id = source.max_id(table, primary_key)
to_max_id = destination.max_id(table, primary_key) + 1
if to_max_id == 1
from_min_id = source.min_id(table, primary_key)
to_max_id = from_min_id if from_min_id > 0
end
starting_id = to_max_id
batch_size = opts[:batch_size]
i = 1
batch_count = ((from_max_id - starting_id + 1) / batch_size.to_f).ceil
while starting_id <= from_max_id
where = "#{quote_ident(primary_key)} >= #{starting_id} AND #{quote_ident(primary_key)} < #{starting_id + batch_size}"
log " #{i}/#{batch_count}: #{where}"
batch_sql_clause = " #{sql_clause.length > 0 ? "#{sql_clause} AND" : "WHERE"} #{where}"
batch_copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quote_ident_full(table)}#{batch_sql_clause}) TO STDOUT"
to_connection.copy_data "COPY #{quote_ident_full(table)} (#{fields}) FROM STDIN" do
from_connection.copy_data batch_copy_to_command do
while (row = from_connection.get_copy_data)
to_connection.put_copy_data(row)
end
end
end
starting_id += batch_size
i += 1
if opts[:sleep] && starting_id <= from_max_id
sleep(opts[:sleep])
end
end
elsif !opts[:truncate] && (opts[:overwrite] || opts[:preserve] || !sql_clause.empty?)
raise PgSync::Error, "No primary key" unless primary_key
temp_table = "pgsync_#{rand(1_000_000_000)}"
file = Tempfile.new(temp_table)
begin
from_connection.copy_data copy_to_command do
while (row = from_connection.get_copy_data)
file.write(row)
end
end
file.rewind
to_connection.exec("CREATE TEMPORARY TABLE #{quote_ident_full(temp_table)} AS SELECT * FROM #{quote_ident_full(table)} WITH NO DATA")
to_connection.copy_data "COPY #{quote_ident_full(temp_table)} (#{fields}) FROM STDIN" do
file.each do |row|
to_connection.put_copy_data(row)
end
end
if opts[:preserve]
to_connection.exec("INSERT INTO #{quote_ident_full(table)} (SELECT * FROM #{quote_ident_full(temp_table)} WHERE NOT EXISTS (SELECT 1 FROM #{quote_ident_full(table)} WHERE #{quote_ident_full(table)}.#{quote_ident(primary_key)} = #{quote_ident_full(temp_table)}.#{quote_ident(primary_key)}))")
else
to_connection.transaction do
to_connection.exec("DELETE FROM #{quote_ident_full(table)} WHERE #{quote_ident(primary_key)} IN (SELECT #{quote_ident(primary_key)} FROM #{quote_ident_full(temp_table)})")
to_connection.exec("INSERT INTO #{quote_ident_full(table)} (SELECT * FROM #{quote_ident(temp_table)})")
end
end
ensure
file.close
file.unlink
end
else
destination.truncate(table)
to_connection.copy_data "COPY #{quote_ident_full(table)} (#{fields}) FROM STDIN" do
from_connection.copy_data copy_to_command do
while (row = from_connection.get_copy_data)
to_connection.put_copy_data(row)
end
end
end
end
seq_values.each do |seq, value|
to_connection.exec("SELECT setval(#{escape(seq)}, #{escape(value)})")
end
end
mutex.synchronize do
log "* DONE #{table_name} (#{(Time.now - start_time).round(1)}s)"
end
ensure
source.close
destination.close
end
end
|