Class: LogicalReplicationInitializer
- Inherits:
-
Object
- Object
- LogicalReplicationInitializer
- Defined in:
- lib/pg_logical_replicator/logical_replication_initializer.rb
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #advance_subscription_origin(lsn) ⇒ Object
- #connect_to_databases ⇒ Object
- #create_logical_replication_slot ⇒ Object
- #create_publication ⇒ Object
- #create_subscription ⇒ Object
- #drop_publication ⇒ Object
- #drop_subscription ⇒ Object
- #dump(snapshot) ⇒ Object
- #enable_subscription ⇒ Object
-
#initialize(args) ⇒ LogicalReplicationInitializer
constructor
A new instance of LogicalReplicationInitializer.
- #restore_dump ⇒ Object
- #setup_logger ⇒ Object
- #start ⇒ Object
- #truncate_table(table_name) ⇒ Object
- #validate_options ⇒ Object
- #with_snapshot ⇒ Object
Constructor Details
#initialize(args) ⇒ LogicalReplicationInitializer
14 15 16 17 18 19 20 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 14 def initialize(args) @options = args setup_logger connect_to_databases end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
12 13 14 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 12 def @options end |
Instance Method Details
#advance_subscription_origin(lsn) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 180 def advance_subscription_origin(lsn) @log.info("Setting replication origin position to #{lsn}") query = <<~SQL SELECT pg_replication_origin_advance('pg_' || subid::text, '#{lsn}') FROM pg_stat_subscription WHERE subname = '#{@slot}' SQL @target.exec(query) end |
#connect_to_databases ⇒ Object
71 72 73 74 75 76 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 71 def connect_to_databases @primary = PG.connect(@primary_conn_str) @secondary = PG.connect(@secondary_conn_str) @target = PG.connect(@target_conn_str) @target_rep = PG.connect(@target_rep_conn_str) end |
#create_logical_replication_slot ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 103 def create_logical_replication_slot @primary.exec("SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1", [@slot]) @primary.exec("SELECT pg_create_logical_replication_slot($1, $2)", [@slot, "pgoutput"]) do |res| slot_name = res.getvalue(0, 0) @log.info("Created logical replication slot #{slot_name}") end end |
#create_publication ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 78 def create_publication @primary.exec("SELECT COUNT(*) FROM pg_publication WHERE pubname = $1", [@publication_name]) do |res| cnt = res.getvalue(0, 0).to_i if cnt.zero? pub_name = PG::Connection.quote_ident(@publication_name) table_list = @table_names.split(',') @primary.exec("CREATE PUBLICATION #{pub_name} FOR TABLE #{PG::Connection.quote_ident(table_list[0])}") table_list[1..].each do |table| begin @log.info("Processing table: #{table}") @primary.exec("ALTER PUBLICATION #{pub_name} ADD TABLE #{PG::Connection.quote_ident(table)}") rescue StandardError => e @log.error("Error adding table #{table}: #{e.}") end end @log.info("Created publication #{@publication_name}") else @log.info("Publication #{@publication_name} already exists") end end end |
#create_subscription ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 164 def create_subscription @log.info("Creating subscription #{@slot}") pub_name = PG::Connection.quote_ident(@publication_name) sub_name = PG::Connection.quote_ident(@slot) query = <<~SQL CREATE SUBSCRIPTION #{sub_name} CONNECTION '#{@subscription_conn_str}' PUBLICATION #{pub_name} WITH (create_slot=false, slot_name=#{sub_name}, enabled=false, copy_data=false) SQL @target.exec(query) end |
#drop_publication ⇒ Object
197 198 199 200 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 197 def drop_publication @log.info("Dropping publication #{@publication_name}") @primary.exec("DROP PUBLICATION IF EXISTS #{PG::Connection.quote_ident(@publication_name)}") end |
#drop_subscription ⇒ Object
202 203 204 205 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 202 def drop_subscription @log.info("Dropping subscription #{@slot}") @target.exec("DROP SUBSCRIPTION IF EXISTS #{PG::Connection.quote_ident(@slot)}") end |
#dump(snapshot) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 119 def dump(snapshot) @log.info("Dumping source DB") table_list = @table_names.split(',') cmd = [ 'pg_dump', '--no-publication', '--no-subscription', "--snapshot=#{snapshot}", '--format=d', '--data-only', '--jobs=8', '-f', @dump_dir, @secondary_conn_str ] table_list.each do |table| @log.info("Dumping table #{table}") cmd << '-t' << table end @log.info("Executing DB Dump Command: #{cmd.join(' ')}") stdout, stderr, status = Open3.capture3(*cmd) unless status.success? @log.error("Error in pg_dump: #{stderr}") raise "pg_dump failed" end end |
#enable_subscription ⇒ Object
192 193 194 195 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 192 def enable_subscription @log.info("Enabling subscription #{@slot}") @target.exec("ALTER SUBSCRIPTION #{PG::Connection.quote_ident(@slot)} ENABLE") end |
#restore_dump ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 152 def restore_dump table_names = @options[:table_names].split(',') table_names.each { |table| truncate_table(table) } @log.info("Restoring dump to target DB") stdout, stderr, status = Open3.capture3('pg_restore', '--format=d', '--jobs=8', '--data-only', '-d', @target_rep_conn_str, @dump_dir) unless status.success? @log.error("Error in pg_restore: #{stderr}") raise "pg_restore failed" end end |
#setup_logger ⇒ Object
22 23 24 25 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 22 def setup_logger @log = Logger.new(STDOUT) @log.level = Logger::INFO end |
#start ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 43 def start begin @log.info("Dumping to #{@dump_dir}") connect_to_databases drop_subscription drop_publication create_publication create_logical_replication_slot lsn = with_snapshot do |lsn, snapshot| @log.info("Creating Snapshot with lsn #{lsn}") dump(snapshot) lsn end restore_dump create_subscription advance_subscription_origin(lsn) enable_subscription rescue StandardError => e @log.error(e.) exit 1 end end |
#truncate_table(table_name) ⇒ Object
148 149 150 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 148 def truncate_table(table_name) @target_rep.exec("TRUNCATE TABLE #{PG::Connection.quote_ident(table_name)}") end |
#validate_options ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 27 def [:primary_conn_str, :target_conn_str, :target_rep_conn_str, :table_names, :dump_dir].each do |opt| raise OptionParser::MissingArgument, opt.to_s unless @options[opt] end @primary_conn_str = [:primary_conn_str] || (raise OptionParser::MissingArgument, '--primary-conn-str is missing') @secondary_conn_str = [:secondary_conn_str] || @primary_conn_str @target_conn_str = [:target_conn_str] || (raise OptionParser::MissingArgument, '--target-conn-str is missing') @target_rep_conn_str = [:target_rep_conn_str] || (raise OptionParser::MissingArgument, '--target-rep-conn-str is missing') @table_names = [:table_names] || (raise OptionParser::MissingArgument, '--table-names is missing') @subscription_conn_str = [:subscription_conn_str] || @primary_conn_str @dump_dir = [:dump_dir] || (raise OptionParser::MissingArgument, '--dump-dir is missing') @slot = [:slot_name] @publication_name = [:publication_name] end |
#with_snapshot ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 111 def with_snapshot @secondary.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ") @secondary.exec("SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END, pg_export_snapshot()") do |res| yield res.getvalue(0, 0), res.getvalue(0, 1) end end |