Class: PG::LogicalReplication::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/logical_replication/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Client



18
19
20
21
# File 'lib/pg/logical_replication/client.rb', line 18

def initialize(connection)
  @connection      = connection
  @command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end

Instance Attribute Details

#command_builderObject (readonly)

Returns the value of attribute command_builder.



7
8
9
# File 'lib/pg/logical_replication/client.rb', line 7

def command_builder
  @command_builder
end

#connectionObject (readonly)

Returns the value of attribute connection.



6
7
8
# File 'lib/pg/logical_replication/client.rb', line 6

def connection
  @connection
end

Class Method Details

.type_map_for_queries(connection) ⇒ Object



9
10
11
# File 'lib/pg/logical_replication/client.rb', line 9

def self.type_map_for_queries(connection)
  @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection)
end

.type_map_for_results(connection) ⇒ Object



13
14
15
# File 'lib/pg/logical_replication/client.rb', line 13

def self.type_map_for_results(connection)
  @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection)
end

Instance Method Details

#add_tables_to_publication(name, tables) ⇒ Object

Adds tables to a publication



284
285
286
# File 'lib/pg/logical_replication/client.rb', line 284

def add_tables_to_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end

#alter_publication_options(name, options) ⇒ Object

Alters parameters originally set by CREATE PUBLICATION



308
309
310
311
# File 'lib/pg/logical_replication/client.rb', line 308

def alter_publication_options(name, options)
  base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#alter_subscription_options(name, options) ⇒ Object

Alters parameters originally set by CREATE SUBSCRIPTION



146
147
148
149
# File 'lib/pg/logical_replication/client.rb', line 146

def alter_subscription_options(name, options)
  base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#create_logical_replication_slot(name) ⇒ Object

Creates a logical replication slot



84
85
86
# File 'lib/pg/logical_replication/client.rb', line 84

def create_logical_replication_slot(name)
  typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')")
end

#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object

Creates a new publication



270
271
272
273
274
275
276
277
278
# File 'lib/pg/logical_replication/client.rb', line 270

def create_publication(name, all_tables = false, tables = [], options = {})
  base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
  if all_tables
    base_command << " FOR ALL TABLES"
  elsif !tables.empty?
    base_command << " FOR TABLE #{safe_list(tables)}"
  end
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object

Creates a subscription to a publisher node



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pg/logical_replication/client.rb', line 61

def create_subscription(name, conninfo_hash, publications, options = {})
  options[:slot_name] = name if !options.key?(:slot_name) && !options.key?("slot_name") && (options['create_slot'] == false || options[:create_slot] == false)

  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  base_command = "    CREATE SUBSCRIPTION \#{connection.quote_ident(name)}\n           CONNECTION '\#{connection_string}'\n           PUBLICATION \#{safe_list(publications)}\n  SQL\n  typed_exec(command_builder.command_with_options(base_command, \"WITH\", options))\nend\n"

#disable_subscription(name) ⇒ Object

Disables the running subscription



138
139
140
# File 'lib/pg/logical_replication/client.rb', line 138

def disable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end

#drop_publication(name, ifexists = false) ⇒ Object

Remove a publication



333
334
335
# File 'lib/pg/logical_replication/client.rb', line 333

def drop_publication(name, ifexists = false)
  typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#drop_replication_slot(name) ⇒ Object

Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.



91
92
93
# File 'lib/pg/logical_replication/client.rb', line 91

def drop_replication_slot(name)
  typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})")
end

#drop_subscription(name, ifexists = false) ⇒ Object

Disconnects the subscription and removes it



77
78
79
# File 'lib/pg/logical_replication/client.rb', line 77

def drop_subscription(name, ifexists = false)
  typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#enable_subscription(name) ⇒ Object

Enables the previously disabled subscription



131
132
133
# File 'lib/pg/logical_replication/client.rb', line 131

def enable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end

#lag_bytesArray<Hash<String,String>>

Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node



28
29
30
31
32
33
34
35
36
# File 'lib/pg/logical_replication/client.rb', line 28

def lag_bytes
  typed_exec("    SELECT\n      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,\n      application_name\n    FROM\n      pg_stat_replication\n  SQL\nend\n").to_a

#publicationsArray<String>

Lists the current publications



245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/pg/logical_replication/client.rb', line 245

def publications
  typed_exec("    SELECT\n      pubname::TEXT AS name,\n      usename::TEXT AS owner,\n      puballtables,\n      pubinsert,\n      pubupdate,\n      pubdelete\n    FROM\n      pg_publication\n      JOIN pg_user ON pubowner = usesysid\n  SQL\nend\n")

#publishes?(publication_name) ⇒ Boolean



260
261
262
# File 'lib/pg/logical_replication/client.rb', line 260

def publishes?(publication_name)
  publications.any? { |p| p["name"] == publication_name }
end

#remove_tables_from_publication(name, tables) ⇒ Object

Removes tables from a publication



300
301
302
# File 'lib/pg/logical_replication/client.rb', line 300

def remove_tables_from_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end

#rename_publication(name, new_name) ⇒ Object

Renames a publication



325
326
327
# File 'lib/pg/logical_replication/client.rb', line 325

def rename_publication(name, new_name)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#rename_subscription(name, new_name) ⇒ Object

Renames the subscription



163
164
165
# File 'lib/pg/logical_replication/client.rb', line 163

def rename_subscription(name, new_name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#replication_slotsArray<String>

Lists the current replication slots



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/pg/logical_replication/client.rb', line 229

def replication_slots
  typed_exec("    SELECT\n      slot_name::TEXT,\n      plugin::TEXT,\n      slot_type::TEXT,\n      database::TEXT,\n      temporary,\n      active\n    FROM pg_replication_slots\n  SQL\nend\n")

#set_publication_owner(name, owner) ⇒ Object

Sets the owner of a publication



317
318
319
# File 'lib/pg/logical_replication/client.rb', line 317

def set_publication_owner(name, owner)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_publication_tables(name, tables) ⇒ Object

Sets the tables included in a publication



292
293
294
# File 'lib/pg/logical_replication/client.rb', line 292

def set_publication_tables(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end

#set_subscription_conninfo(name, conninfo_hash) ⇒ Object

Updates a subscription connection string



99
100
101
102
# File 'lib/pg/logical_replication/client.rb', line 99

def set_subscription_conninfo(name, conninfo_hash)
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end

#set_subscription_owner(name, owner) ⇒ Object

Sets the owner of the subscription



155
156
157
# File 'lib/pg/logical_replication/client.rb', line 155

def set_subscription_owner(name, owner)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_subscription_publications(name, publications, options = {}) ⇒ Object

Changes list of subscribed publications



109
110
111
112
113
114
115
# File 'lib/pg/logical_replication/client.rb', line 109

def set_subscription_publications(name, publications, options = {})
  base_command = "    ALTER SUBSCRIPTION \#{connection.quote_ident(name)}\n    SET PUBLICATION \#{safe_list(publications)}\n  SQL\n  typed_exec(@command_builder.command_with_options(base_command, \"WITH\", options))\nend\n"

#subscriber?(dbname = nil) ⇒ Boolean

Returns if this database is subscribing to any publications



222
223
224
# File 'lib/pg/logical_replication/client.rb', line 222

def subscriber?(dbname = nil)
  subscriptions(dbname).any?
end

#subscriptions(dbname = nil) ⇒ Array<Hash>

Shows status and basic information about all subscriptions



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/pg/logical_replication/client.rb', line 181

def subscriptions(dbname = nil)
  subscriptions = typed_exec("    SELECT\n      sub.subname::TEXT         AS subscription_name,\n      pg_database.datname::TEXT AS database_name,\n      pg_user.usename::TEXT     AS owner,\n      COUNT(sub_stat.pid)       AS worker_count,\n      sub.subenabled            AS enabled,\n      sub.subconninfo           AS subscription_dsn,\n      sub.subslotname::TEXT     AS slot_name,\n      sub.subpublications       AS publications,\n      stat.remote_lsn::TEXT     AS remote_replication_lsn,\n      stat.local_lsn::TEXT      AS local_replication_lsn\n    FROM\n      pg_subscription AS sub\n      JOIN pg_user\n        ON sub.subowner = usesysid\n      JOIN pg_database\n        ON sub.subdbid = pg_database.oid\n      LEFT JOIN pg_replication_origin_status stat\n        ON concat('pg_', sub.oid) = stat.external_id\n      LEFT JOIN pg_stat_subscription sub_stat\n        ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL\n    GROUP BY\n      sub.subname,\n      pg_database.datname,\n      pg_user.usename,\n      sub.subenabled,\n      sub.subconninfo,\n      sub.subslotname,\n      sub.subpublications,\n      stat.remote_lsn,\n      stat.local_lsn\n  SQL\n\n  dbname ? subscriptions.select { |s| s[\"database_name\"] == dbname } : subscriptions\nend\n").to_a

#sync_subscription(name, options = {}) ⇒ Object

Fetch missing table information from publisher



121
122
123
124
125
126
# File 'lib/pg/logical_replication/client.rb', line 121

def sync_subscription(name, options = {})
  base_command = "    ALTER SUBSCRIPTION \#{connection.quote_ident(name)} REFRESH PUBLICATION\n  SQL\n  typed_exec(@command_builder.command_with_options(base_command, \"WITH\", options))\nend\n"

#tables_in_publication(name) ⇒ Array<String>

Lists the tables currently in the publication



341
342
343
344
345
346
347
# File 'lib/pg/logical_replication/client.rb', line 341

def tables_in_publication(name)
  typed_exec("    SELECT tablename::TEXT\n    FROM pg_publication_tables\n    WHERE pubname = $1\n  SQL\nend\n", name).values.flatten

#wal_retained_bytesArray<Hash<String,String>>

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pg/logical_replication/client.rb', line 43

def wal_retained_bytes
  typed_exec("    SELECT\n      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,\n      slot_name::TEXT\n    FROM\n      pg_replication_slots\n    WHERE\n      plugin = 'pgoutput'\n  SQL\nend\n").to_a