Class: PG::LogicalReplication::Client
- Inherits:
-
Object
- Object
- PG::LogicalReplication::Client
- Defined in:
- lib/pg/logical_replication/client.rb
Instance Attribute Summary collapse
-
#command_builder ⇒ Object
readonly
Returns the value of attribute command_builder.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication.
-
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION.
-
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION.
-
#create_logical_replication_slot(name) ⇒ Object
Creates a logical replication slot.
-
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication.
-
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node.
-
#disable_subscription(name) ⇒ Object
Disables the running subscription.
-
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication.
-
#drop_replication_slot(name) ⇒ Object
Drops the physical or logical replication slot.
-
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it.
-
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription.
-
#initialize(connection) ⇒ Client
constructor
A new instance of Client.
-
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node.
-
#publications ⇒ Array<String>
Lists the current publications.
- #publishes?(publication_name) ⇒ Boolean
-
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication.
-
#rename_publication(name, new_name) ⇒ Object
Renames a publication.
-
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription.
-
#replication_slots ⇒ Array<String>
Lists the current replication slots.
-
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication.
-
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication.
-
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string.
-
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription.
-
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications.
-
#subscriber?(dbname = nil) ⇒ Boolean
Returns if this database is subscribing to any publications.
-
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions.
-
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher.
-
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication.
-
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node.
Constructor Details
#initialize(connection) ⇒ Client
Returns a new instance of Client.
18 19 20 21 |
# File 'lib/pg/logical_replication/client.rb', line 18 def initialize(connection) @connection = connection @command_builder = PG::LogicalReplication::CommandBuilder.new(connection) end |
Instance Attribute Details
#command_builder ⇒ Object (readonly)
Returns the value of attribute command_builder.
7 8 9 |
# File 'lib/pg/logical_replication/client.rb', line 7 def command_builder @command_builder end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
6 7 8 |
# File 'lib/pg/logical_replication/client.rb', line 6 def connection @connection end |
Class Method Details
.type_map_for_queries(connection) ⇒ Object
9 10 11 |
# File 'lib/pg/logical_replication/client.rb', line 9 def self.type_map_for_queries(connection) @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection) end |
.type_map_for_results(connection) ⇒ Object
13 14 15 |
# File 'lib/pg/logical_replication/client.rb', line 13 def self.type_map_for_results(connection) @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection) end |
Instance Method Details
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication
284 285 286 |
# File 'lib/pg/logical_replication/client.rb', line 284 def add_tables_to_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}") end |
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION
308 309 310 311 |
# File 'lib/pg/logical_replication/client.rb', line 308 def (name, ) base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION
146 147 148 149 |
# File 'lib/pg/logical_replication/client.rb', line 146 def (name, ) base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#create_logical_replication_slot(name) ⇒ Object
Creates a logical replication slot
84 85 86 |
# File 'lib/pg/logical_replication/client.rb', line 84 def create_logical_replication_slot(name) typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')") end |
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication
270 271 272 273 274 275 276 277 278 |
# File 'lib/pg/logical_replication/client.rb', line 270 def create_publication(name, all_tables = false, tables = [], = {}) base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}" if all_tables base_command << " FOR ALL TABLES" elsif !tables.empty? base_command << " FOR TABLE #{safe_list(tables)}" end typed_exec(@command_builder.(base_command, "WITH", )) end |
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pg/logical_replication/client.rb', line 61 def create_subscription(name, conninfo_hash, publications, = {}) [:slot_name] = name if !.key?(:slot_name) && !.key?("slot_name") && (['create_slot'] == false || [:create_slot] == false) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) base_command = <<-SQL CREATE SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}' PUBLICATION #{safe_list(publications)} SQL typed_exec(command_builder.(base_command, "WITH", )) end |
#disable_subscription(name) ⇒ Object
Disables the running subscription
138 139 140 |
# File 'lib/pg/logical_replication/client.rb', line 138 def disable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE") end |
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication
333 334 335 |
# File 'lib/pg/logical_replication/client.rb', line 333 def drop_publication(name, ifexists = false) typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#drop_replication_slot(name) ⇒ Object
Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.
91 92 93 |
# File 'lib/pg/logical_replication/client.rb', line 91 def drop_replication_slot(name) typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})") end |
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it
77 78 79 |
# File 'lib/pg/logical_replication/client.rb', line 77 def drop_subscription(name, ifexists = false) typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription
131 132 133 |
# File 'lib/pg/logical_replication/client.rb', line 131 def enable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE") end |
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node
28 29 30 31 32 33 34 35 36 |
# File 'lib/pg/logical_replication/client.rb', line 28 def lag_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes, application_name FROM pg_stat_replication SQL end |
#publications ⇒ Array<String>
Lists the current publications
245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/pg/logical_replication/client.rb', line 245 def publications typed_exec(<<-SQL) SELECT pubname::TEXT AS name, usename::TEXT AS owner, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication JOIN pg_user ON pubowner = usesysid SQL end |
#publishes?(publication_name) ⇒ Boolean
260 261 262 |
# File 'lib/pg/logical_replication/client.rb', line 260 def publishes?(publication_name) publications.any? { |p| p["name"] == publication_name } end |
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication
300 301 302 |
# File 'lib/pg/logical_replication/client.rb', line 300 def remove_tables_from_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}") end |
#rename_publication(name, new_name) ⇒ Object
Renames a publication
325 326 327 |
# File 'lib/pg/logical_replication/client.rb', line 325 def rename_publication(name, new_name) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription
163 164 165 |
# File 'lib/pg/logical_replication/client.rb', line 163 def rename_subscription(name, new_name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#replication_slots ⇒ Array<String>
Lists the current replication slots
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/pg/logical_replication/client.rb', line 229 def replication_slots typed_exec(<<-SQL) SELECT slot_name::TEXT, plugin::TEXT, slot_type::TEXT, database::TEXT, temporary, active FROM pg_replication_slots SQL end |
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication
317 318 319 |
# File 'lib/pg/logical_replication/client.rb', line 317 def set_publication_owner(name, owner) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication
292 293 294 |
# File 'lib/pg/logical_replication/client.rb', line 292 def set_publication_tables(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}") end |
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string
99 100 101 102 |
# File 'lib/pg/logical_replication/client.rb', line 99 def set_subscription_conninfo(name, conninfo_hash) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'") end |
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription
155 156 157 |
# File 'lib/pg/logical_replication/client.rb', line 155 def set_subscription_owner(name, owner) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications
109 110 111 112 113 114 115 |
# File 'lib/pg/logical_replication/client.rb', line 109 def set_subscription_publications(name, publications, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} SET PUBLICATION #{safe_list(publications)} SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#subscriber?(dbname = nil) ⇒ Boolean
Returns if this database is subscribing to any publications
222 223 224 |
# File 'lib/pg/logical_replication/client.rb', line 222 def subscriber?(dbname = nil) subscriptions(dbname).any? end |
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/pg/logical_replication/client.rb', line 181 def subscriptions(dbname = nil) subscriptions = typed_exec(<<-SQL).to_a SELECT sub.subname::TEXT AS subscription_name, pg_database.datname::TEXT AS database_name, pg_user.usename::TEXT AS owner, COUNT(sub_stat.pid) AS worker_count, sub.subenabled AS enabled, sub.subconninfo AS subscription_dsn, sub.subslotname::TEXT AS slot_name, sub.subpublications AS publications, stat.remote_lsn::TEXT AS remote_replication_lsn, stat.local_lsn::TEXT AS local_replication_lsn FROM pg_subscription AS sub JOIN pg_user ON sub.subowner = usesysid JOIN pg_database ON sub.subdbid = pg_database.oid LEFT JOIN pg_replication_origin_status stat ON concat('pg_', sub.oid) = stat.external_id LEFT JOIN pg_stat_subscription sub_stat ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL GROUP BY sub.subname, pg_database.datname, pg_user.usename, sub.subenabled, sub.subconninfo, sub.subslotname, sub.subpublications, stat.remote_lsn, stat.local_lsn SQL dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions end |
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher
121 122 123 124 125 126 |
# File 'lib/pg/logical_replication/client.rb', line 121 def sync_subscription(name, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication
341 342 343 344 345 346 347 |
# File 'lib/pg/logical_replication/client.rb', line 341 def tables_in_publication(name) typed_exec(<<-SQL, name).values.flatten SELECT tablename::TEXT FROM pg_publication_tables WHERE pubname = $1 SQL end |
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/pg/logical_replication/client.rb', line 43 def wal_retained_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes, slot_name::TEXT FROM pg_replication_slots WHERE plugin = 'pgoutput' SQL end |