23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/pg_logical_replicator/cli.rb', line 23
def setup
source_host = options[:source_host]
source_port = options[:source_port]
source_database = options[:source_database]
target_host = options[:target_host]
target_port = options[:target_port]
target_database = options[:target_database] || source_database
source_username = options[:source_username]
source_password = options[:source_password]
target_username = options[:target_username] || source_username
target_password = options[:target_password] || source_password
target_rep_username = options[:target_rep_username] || target_username
target_rep_password = options[:target_rep_password] || target_password
num_slots = options[:num_slots]
target_groups = options[:groups].map(&:to_i) if options[:groups]
conn = PG.connect(dbname: source_database, user: source_username, password: source_password, host: source_host, port: source_port)
query = "SELECT tablename FROM pg_tables WHERE schemaname = 'public' order by tablename ASC;"
tables = begin
result = conn.exec(query)
result.map { |row| row['tablename'] }
ensure
conn.close if conn
end
puts "Total Tables: #{tables.size}"
tables_per_group = tables.size / num_slots
puts "Tables per group: #{tables_per_group}"
home_directory = Dir.home
puts "Home directory: #{home_directory}"
dump_dir_root = "#{home_directory}/db-dumps/#{source_database}"
system("mkdir -p #{dump_dir_root}")
puts "Target Groups: #{target_groups}"
tables.each_slice(tables_per_group).with_index do |table_group, group_idx|
puts "Processing group #{group_idx + 1} size: #{table_group.size}"
group_number = group_idx + 1
next unless target_groups.nil? || target_groups.include?(group_number)
table_names = table_group.join(',')
dump_dir = "#{dump_dir_root}/#{group_number}"
puts "Removing directory: #{dump_dir}"
FileUtils.rm_rf(dump_dir)
begin
puts "Creating directory: #{dump_dir}"
FileUtils.mkdir_p(dump_dir)
LogicalReplicationInitializer.new({
slot_name: "logical_sub_grp_#{group_number}",
publication_name: "logical_pub_grp_#{group_number}",
primary_conn_str: "host=#{source_host} port=#{source_port} dbname=#{source_database} user=#{source_username} password=#{source_password}",
target_conn_str: "host=#{target_host} port=#{target_port} dbname=#{target_database} user=#{target_username} password=#{target_password}",
target_rep_conn_str: "host=#{target_host} port=#{target_port} dbname=#{target_database} user=#{target_rep_username} password=#{target_rep_password}",
table_names: table_names,
dump_dir: dump_dir
}).start
ensure
puts "Removing directory: #{dump_dir}"
FileUtils.rm_rf(dump_dir)
end
rescue StandardError => e
puts "Error: #{e.message}"
end
end
|