Class: LogicalReplicationInitializer

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_logical_replicator/logical_replication_initializer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ LogicalReplicationInitializer



14
15
16
17
18
19
20
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 14

def initialize(args)
  @options = args

  setup_logger
  validate_options
  connect_to_databases
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



12
13
14
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 12

def options
  @options
end

Instance Method Details

#advance_subscription_origin(lsn) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 180

def advance_subscription_origin(lsn)
  @log.info("Setting replication origin position to #{lsn}")

  query    = <<~SQL
    SELECT pg_replication_origin_advance('pg_' || subid::text, '#{lsn}')
    FROM pg_stat_subscription
    WHERE subname = '#{@slot}'
  SQL

  @target.exec(query)
end

#connect_to_databasesObject



71
72
73
74
75
76
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 71

def connect_to_databases
  @primary    = PG.connect(@primary_conn_str)
  @secondary  = PG.connect(@secondary_conn_str)
  @target     = PG.connect(@target_conn_str)
  @target_rep = PG.connect(@target_rep_conn_str)
end

#create_logical_replication_slotObject



103
104
105
106
107
108
109
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 103

def create_logical_replication_slot
  @primary.exec("SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1", [@slot])
  @primary.exec("SELECT pg_create_logical_replication_slot($1, $2)", [@slot, "pgoutput"]) do |res|
    slot_name = res.getvalue(0, 0)
    @log.info("Created logical replication slot #{slot_name}")
  end
end

#create_publicationObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 78

def create_publication
  @primary.exec("SELECT COUNT(*) FROM pg_publication WHERE pubname = $1", [@publication_name]) do |res|
    cnt = res.getvalue(0, 0).to_i

    if cnt.zero?
      pub_name   = PG::Connection.quote_ident(@publication_name)
      table_list = @table_names.split(',')

      @primary.exec("CREATE PUBLICATION #{pub_name} FOR TABLE #{PG::Connection.quote_ident(table_list[0])}")

      table_list[1..].each do |table|
        begin
          @log.info("Processing table: #{table}")
          @primary.exec("ALTER PUBLICATION #{pub_name} ADD TABLE #{PG::Connection.quote_ident(table)}")
        rescue StandardError => e
          @log.error("Error adding table #{table}: #{e.message}")
        end
      end
      @log.info("Created publication #{@publication_name}")
    else
      @log.info("Publication #{@publication_name} already exists")
    end
  end
end

#create_subscriptionObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 164

def create_subscription
  @log.info("Creating subscription #{@slot}")

  pub_name  = PG::Connection.quote_ident(@publication_name)
  sub_name  = PG::Connection.quote_ident(@slot)

  query = <<~SQL
    CREATE SUBSCRIPTION #{sub_name}
    CONNECTION '#{@subscription_conn_str}'
    PUBLICATION #{pub_name}
    WITH (create_slot=false, slot_name=#{sub_name}, enabled=false, copy_data=false)
  SQL

  @target.exec(query)
end

#drop_publicationObject



197
198
199
200
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 197

def drop_publication
  @log.info("Dropping publication #{@publication_name}")
  @primary.exec("DROP PUBLICATION IF EXISTS #{PG::Connection.quote_ident(@publication_name)}")
end

#drop_subscriptionObject



202
203
204
205
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 202

def drop_subscription
  @log.info("Dropping subscription #{@slot}")
  @target.exec("DROP SUBSCRIPTION IF EXISTS #{PG::Connection.quote_ident(@slot)}")
end

#dump(snapshot) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 119

def dump(snapshot)
  @log.info("Dumping source DB")
  table_list = @table_names.split(',')

  cmd = [
    'pg_dump',
    '--no-publication',
    '--no-subscription',
    "--snapshot=#{snapshot}",
    '--format=d',
    '--data-only',
    '--jobs=8',
    '-f', @dump_dir,
    @secondary_conn_str
  ]

  table_list.each do |table|
    @log.info("Dumping table #{table}")
    cmd << '-t' << table
  end

  @log.info("Executing DB Dump Command: #{cmd.join(' ')}")
  stdout, stderr, status = Open3.capture3(*cmd)
  unless status.success?
    @log.error("Error in pg_dump: #{stderr}")
    raise "pg_dump failed"
  end
end

#enable_subscriptionObject



192
193
194
195
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 192

def enable_subscription
  @log.info("Enabling subscription #{@slot}")
  @target.exec("ALTER SUBSCRIPTION #{PG::Connection.quote_ident(@slot)} ENABLE")
end

#restore_dumpObject



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 152

def restore_dump
  table_names = @options[:table_names].split(',')
  table_names.each { |table| truncate_table(table) }

  @log.info("Restoring dump to target DB")
  stdout, stderr, status = Open3.capture3('pg_restore', '--format=d', '--jobs=8', '--data-only', '-d', @target_rep_conn_str, @dump_dir)
  unless status.success?
    @log.error("Error in pg_restore: #{stderr}")
    raise "pg_restore failed"
  end
end

#setup_loggerObject



22
23
24
25
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 22

def setup_logger
  @log       = Logger.new(STDOUT)
  @log.level = Logger::INFO
end

#startObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 43

def start
  begin
    @log.info("Dumping to #{@dump_dir}")
    connect_to_databases

    drop_subscription
    drop_publication
    create_publication
    create_logical_replication_slot

    lsn = with_snapshot do |lsn, snapshot|
      @log.info("Creating Snapshot with lsn #{lsn}")

      dump(snapshot)

      lsn
    end

    restore_dump
    create_subscription
    advance_subscription_origin(lsn)
    enable_subscription
  rescue StandardError => e
    @log.error(e.message)
    exit 1
  end
end

#truncate_table(table_name) ⇒ Object



148
149
150
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 148

def truncate_table(table_name)
  @target_rep.exec("TRUNCATE TABLE #{PG::Connection.quote_ident(table_name)}")
end

#validate_optionsObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 27

def validate_options
  [:primary_conn_str, :target_conn_str, :target_rep_conn_str, :table_names, :dump_dir].each do |opt|
    raise OptionParser::MissingArgument, opt.to_s unless @options[opt]
  end

  @primary_conn_str      = options[:primary_conn_str] || (raise OptionParser::MissingArgument, '--primary-conn-str is missing')
  @secondary_conn_str    = options[:secondary_conn_str] || @primary_conn_str
  @target_conn_str       = options[:target_conn_str] || (raise OptionParser::MissingArgument, '--target-conn-str is missing')
  @target_rep_conn_str   = options[:target_rep_conn_str] || (raise OptionParser::MissingArgument, '--target-rep-conn-str is missing')
  @table_names           = options[:table_names] || (raise OptionParser::MissingArgument, '--table-names is missing')
  @subscription_conn_str = options[:subscription_conn_str] || @primary_conn_str
  @dump_dir              = options[:dump_dir] || (raise OptionParser::MissingArgument, '--dump-dir is missing')
  @slot                  = options[:slot_name]
  @publication_name      = options[:publication_name]
end

#with_snapshotObject



111
112
113
114
115
116
117
# File 'lib/pg_logical_replicator/logical_replication_initializer.rb', line 111

def with_snapshot
  @secondary.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")

  @secondary.exec("SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END, pg_export_snapshot()") do |res|
    yield res.getvalue(0, 0), res.getvalue(0, 1)
  end
end