Class: PG::LogicalReplication::Client
- Inherits:
-
Object
- Object
- PG::LogicalReplication::Client
- Defined in:
- lib/pg/logical_replication/client.rb
Instance Attribute Summary collapse
-
#command_builder ⇒ Object
readonly
Returns the value of attribute command_builder.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication.
-
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION.
-
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION.
-
#create_logical_replication_slot(name) ⇒ Object
Creates a logical replication slot.
-
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication.
-
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node.
-
#disable_subscription(name) ⇒ Object
Disables the running subscription.
-
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication.
-
#drop_replication_slot(name) ⇒ Object
Drops the physical or logical replication slot.
-
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it.
-
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription.
-
#initialize(connection) ⇒ Client
constructor
A new instance of Client.
-
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node.
-
#publications ⇒ Array<String>
Lists the current publications.
- #publishes?(publication_name) ⇒ Boolean
-
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication.
-
#rename_publication(name, new_name) ⇒ Object
Renames a publication.
-
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription.
-
#replication_slots ⇒ Array<String>
Lists the current replication slots.
-
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication.
-
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication.
-
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string.
-
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription.
-
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications.
-
#subscriber?(dbname = nil) ⇒ Boolean
Returns if this database is subscribing to any publications.
-
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions.
-
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher.
-
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication.
-
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node.
Constructor Details
#initialize(connection) ⇒ Client
Returns a new instance of Client.
18 19 20 21 |
# File 'lib/pg/logical_replication/client.rb', line 18 def initialize(connection) @connection = connection @command_builder = PG::LogicalReplication::CommandBuilder.new(connection) end |
Instance Attribute Details
#command_builder ⇒ Object (readonly)
Returns the value of attribute command_builder.
7 8 9 |
# File 'lib/pg/logical_replication/client.rb', line 7 def command_builder @command_builder end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
6 7 8 |
# File 'lib/pg/logical_replication/client.rb', line 6 def connection @connection end |
Class Method Details
.type_map_for_queries(connection) ⇒ Object
9 10 11 |
# File 'lib/pg/logical_replication/client.rb', line 9 def self.type_map_for_queries(connection) @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection) end |
.type_map_for_results(connection) ⇒ Object
13 14 15 |
# File 'lib/pg/logical_replication/client.rb', line 13 def self.type_map_for_results(connection) @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection) end |
Instance Method Details
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication
282 283 284 |
# File 'lib/pg/logical_replication/client.rb', line 282 def add_tables_to_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}") end |
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION
306 307 308 309 |
# File 'lib/pg/logical_replication/client.rb', line 306 def (name, ) base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION
144 145 146 147 |
# File 'lib/pg/logical_replication/client.rb', line 144 def (name, ) base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#create_logical_replication_slot(name) ⇒ Object
Creates a logical replication slot
82 83 84 |
# File 'lib/pg/logical_replication/client.rb', line 82 def create_logical_replication_slot(name) typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')") end |
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication
268 269 270 271 272 273 274 275 276 |
# File 'lib/pg/logical_replication/client.rb', line 268 def create_publication(name, all_tables = false, tables = [], = {}) base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}" if all_tables base_command << " FOR ALL TABLES" elsif !tables.empty? base_command << " FOR TABLE #{safe_list(tables)}" end typed_exec(@command_builder.(base_command, "WITH", )) end |
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node
61 62 63 64 65 66 67 68 69 |
# File 'lib/pg/logical_replication/client.rb', line 61 def create_subscription(name, conninfo_hash, publications, = {}) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) base_command = <<-SQL CREATE SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}' PUBLICATION #{safe_list(publications)} SQL typed_exec(command_builder.(base_command, "WITH", )) end |
#disable_subscription(name) ⇒ Object
Disables the running subscription
136 137 138 |
# File 'lib/pg/logical_replication/client.rb', line 136 def disable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE") end |
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication
331 332 333 |
# File 'lib/pg/logical_replication/client.rb', line 331 def drop_publication(name, ifexists = false) typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#drop_replication_slot(name) ⇒ Object
Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.
89 90 91 |
# File 'lib/pg/logical_replication/client.rb', line 89 def drop_replication_slot(name) typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})") end |
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it
75 76 77 |
# File 'lib/pg/logical_replication/client.rb', line 75 def drop_subscription(name, ifexists = false) typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription
129 130 131 |
# File 'lib/pg/logical_replication/client.rb', line 129 def enable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE") end |
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node
28 29 30 31 32 33 34 35 36 |
# File 'lib/pg/logical_replication/client.rb', line 28 def lag_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes, application_name FROM pg_stat_replication SQL end |
#publications ⇒ Array<String>
Lists the current publications
243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/pg/logical_replication/client.rb', line 243 def publications typed_exec(<<-SQL) SELECT pubname::TEXT AS name, usename::TEXT AS owner, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication JOIN pg_user ON pubowner = usesysid SQL end |
#publishes?(publication_name) ⇒ Boolean
258 259 260 |
# File 'lib/pg/logical_replication/client.rb', line 258 def publishes?(publication_name) publications.any? { |p| p["name"] == publication_name } end |
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication
298 299 300 |
# File 'lib/pg/logical_replication/client.rb', line 298 def remove_tables_from_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}") end |
#rename_publication(name, new_name) ⇒ Object
Renames a publication
323 324 325 |
# File 'lib/pg/logical_replication/client.rb', line 323 def rename_publication(name, new_name) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription
161 162 163 |
# File 'lib/pg/logical_replication/client.rb', line 161 def rename_subscription(name, new_name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#replication_slots ⇒ Array<String>
Lists the current replication slots
227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/pg/logical_replication/client.rb', line 227 def replication_slots typed_exec(<<-SQL) SELECT slot_name::TEXT, plugin::TEXT, slot_type::TEXT, database::TEXT, temporary, active FROM pg_replication_slots SQL end |
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication
315 316 317 |
# File 'lib/pg/logical_replication/client.rb', line 315 def set_publication_owner(name, owner) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication
290 291 292 |
# File 'lib/pg/logical_replication/client.rb', line 290 def set_publication_tables(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}") end |
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string
97 98 99 100 |
# File 'lib/pg/logical_replication/client.rb', line 97 def set_subscription_conninfo(name, conninfo_hash) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'") end |
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription
153 154 155 |
# File 'lib/pg/logical_replication/client.rb', line 153 def set_subscription_owner(name, owner) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications
107 108 109 110 111 112 113 |
# File 'lib/pg/logical_replication/client.rb', line 107 def set_subscription_publications(name, publications, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} SET PUBLICATION #{safe_list(publications)} SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#subscriber?(dbname = nil) ⇒ Boolean
Returns if this database is subscribing to any publications
220 221 222 |
# File 'lib/pg/logical_replication/client.rb', line 220 def subscriber?(dbname = nil) subscriptions(dbname).any? end |
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/pg/logical_replication/client.rb', line 179 def subscriptions(dbname = nil) subscriptions = typed_exec(<<-SQL).to_a SELECT sub.subname::TEXT AS subscription_name, pg_database.datname::TEXT AS database_name, pg_user.usename::TEXT AS owner, COUNT(sub_stat.pid) AS worker_count, sub.subenabled AS enabled, sub.subconninfo AS subscription_dsn, sub.subslotname::TEXT AS slot_name, sub.subpublications AS publications, stat.remote_lsn::TEXT AS remote_replication_lsn, stat.local_lsn::TEXT AS local_replication_lsn FROM pg_subscription AS sub JOIN pg_user ON sub.subowner = usesysid JOIN pg_database ON sub.subdbid = pg_database.oid LEFT JOIN pg_replication_origin_status stat ON concat('pg_', sub.oid) = stat.external_id LEFT JOIN pg_stat_subscription sub_stat ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL GROUP BY sub.subname, pg_database.datname, pg_user.usename, sub.subenabled, sub.subconninfo, sub.subslotname, sub.subpublications, stat.remote_lsn, stat.local_lsn SQL dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions end |
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher
119 120 121 122 123 124 |
# File 'lib/pg/logical_replication/client.rb', line 119 def sync_subscription(name, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication
339 340 341 342 343 344 345 |
# File 'lib/pg/logical_replication/client.rb', line 339 def tables_in_publication(name) typed_exec(<<-SQL, name).values.flatten SELECT tablename::TEXT FROM pg_publication_tables WHERE pubname = $1 SQL end |
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/pg/logical_replication/client.rb', line 43 def wal_retained_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes, slot_name::TEXT FROM pg_replication_slots WHERE plugin = 'pgoutput' SQL end |