Class: PG::LogicalReplication::Client
- Inherits:
-
Object
- Object
- PG::LogicalReplication::Client
- Defined in:
- lib/pg/logical_replication/client.rb
Instance Attribute Summary collapse
-
#command_builder ⇒ Object
readonly
Returns the value of attribute command_builder.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication.
-
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION.
-
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION.
-
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication.
-
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node.
-
#disable_subscription(name) ⇒ Object
Disables the running subscription.
-
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication.
-
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it.
-
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription.
-
#initialize(connection) ⇒ Client
constructor
A new instance of Client.
-
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node.
-
#publications ⇒ Array<String>
Lists the current publications.
- #publishes?(publication_name) ⇒ Boolean
-
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication.
-
#rename_publication(name, new_name) ⇒ Object
Renames a publication.
-
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription.
-
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication.
-
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication.
-
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string.
-
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription.
-
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications.
-
#subscriber? ⇒ Boolean
Returns if this database is subscribing to any publications.
-
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions.
-
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher.
-
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication.
-
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node.
Constructor Details
#initialize(connection) ⇒ Client
Returns a new instance of Client.
10 11 12 13 |
# File 'lib/pg/logical_replication/client.rb', line 10 def initialize(connection) @connection = connection @command_builder = PG::LogicalReplication::CommandBuilder.new(connection) end |
Instance Attribute Details
#command_builder ⇒ Object (readonly)
Returns the value of attribute command_builder.
7 8 9 |
# File 'lib/pg/logical_replication/client.rb', line 7 def command_builder @command_builder end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
6 7 8 |
# File 'lib/pg/logical_replication/client.rb', line 6 def connection @connection end |
Instance Method Details
#add_tables_to_publication(name, tables) ⇒ Object
Adds tables to a publication
244 245 246 |
# File 'lib/pg/logical_replication/client.rb', line 244 def add_tables_to_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}") end |
#alter_publication_options(name, options) ⇒ Object
Alters parameters originally set by CREATE PUBLICATION
268 269 270 271 |
# File 'lib/pg/logical_replication/client.rb', line 268 def (name, ) base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#alter_subscription_options(name, options) ⇒ Object
Alters parameters originally set by CREATE SUBSCRIPTION
122 123 124 125 |
# File 'lib/pg/logical_replication/client.rb', line 122 def (name, ) base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}" typed_exec(command_builder.(base_command, "SET", )) end |
#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object
Creates a new publication
230 231 232 233 234 235 236 237 238 |
# File 'lib/pg/logical_replication/client.rb', line 230 def create_publication(name, all_tables = false, tables = [], = {}) base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}" if all_tables base_command << " FOR ALL TABLES" elsif !tables.empty? base_command << " FOR TABLE #{safe_list(tables)}" end typed_exec(@command_builder.(base_command, "WITH", )) end |
#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object
Creates a subscription to a publisher node
53 54 55 56 57 58 59 60 61 |
# File 'lib/pg/logical_replication/client.rb', line 53 def create_subscription(name, conninfo_hash, publications, = {}) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) base_command = <<-SQL CREATE SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}' PUBLICATION #{safe_list(publications)} SQL typed_exec(command_builder.(base_command, "WITH", )) end |
#disable_subscription(name) ⇒ Object
Disables the running subscription
114 115 116 |
# File 'lib/pg/logical_replication/client.rb', line 114 def disable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE") end |
#drop_publication(name, ifexists = false) ⇒ Object
Remove a publication
293 294 295 |
# File 'lib/pg/logical_replication/client.rb', line 293 def drop_publication(name, ifexists = false) typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#drop_subscription(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it
67 68 69 |
# File 'lib/pg/logical_replication/client.rb', line 67 def drop_subscription(name, ifexists = false) typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end |
#enable_subscription(name) ⇒ Object
Enables the previously disabled subscription
107 108 109 |
# File 'lib/pg/logical_replication/client.rb', line 107 def enable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE") end |
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node
20 21 22 23 24 25 26 27 28 |
# File 'lib/pg/logical_replication/client.rb', line 20 def lag_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes, application_name FROM pg_stat_replication SQL end |
#publications ⇒ Array<String>
Lists the current publications
205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/pg/logical_replication/client.rb', line 205 def publications typed_exec(<<-SQL) SELECT pubname::TEXT AS name, usename::TEXT AS owner, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication JOIN pg_user ON pubowner = usesysid SQL end |
#publishes?(publication_name) ⇒ Boolean
220 221 222 |
# File 'lib/pg/logical_replication/client.rb', line 220 def publishes?(publication_name) publications.any? { |p| p["name"] == publication_name } end |
#remove_tables_from_publication(name, tables) ⇒ Object
Removes tables from a publication
260 261 262 |
# File 'lib/pg/logical_replication/client.rb', line 260 def remove_tables_from_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}") end |
#rename_publication(name, new_name) ⇒ Object
Renames a publication
285 286 287 |
# File 'lib/pg/logical_replication/client.rb', line 285 def rename_publication(name, new_name) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#rename_subscription(name, new_name) ⇒ Object
Renames the subscription
139 140 141 |
# File 'lib/pg/logical_replication/client.rb', line 139 def rename_subscription(name, new_name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end |
#set_publication_owner(name, owner) ⇒ Object
Sets the owner of a publication
277 278 279 |
# File 'lib/pg/logical_replication/client.rb', line 277 def set_publication_owner(name, owner) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_publication_tables(name, tables) ⇒ Object
Sets the tables included in a publication
252 253 254 |
# File 'lib/pg/logical_replication/client.rb', line 252 def set_publication_tables(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}") end |
#set_subscription_conninfo(name, conninfo_hash) ⇒ Object
Updates a subscription connection string
75 76 77 78 |
# File 'lib/pg/logical_replication/client.rb', line 75 def set_subscription_conninfo(name, conninfo_hash) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'") end |
#set_subscription_owner(name, owner) ⇒ Object
Sets the owner of the subscription
131 132 133 |
# File 'lib/pg/logical_replication/client.rb', line 131 def set_subscription_owner(name, owner) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end |
#set_subscription_publications(name, publications, options = {}) ⇒ Object
Changes list of subscribed publications
85 86 87 88 89 90 91 |
# File 'lib/pg/logical_replication/client.rb', line 85 def set_subscription_publications(name, publications, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} SET PUBLICATION #{safe_list(publications)} SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#subscriber? ⇒ Boolean
Returns if this database is subscribing to any publications
198 199 200 |
# File 'lib/pg/logical_replication/client.rb', line 198 def subscriber? subscriptions.any? end |
#subscriptions(dbname = nil) ⇒ Array<Hash>
Shows status and basic information about all subscriptions
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/pg/logical_replication/client.rb', line 157 def subscriptions(dbname = nil) subscriptions = typed_exec(<<-SQL).to_a SELECT sub.subname::TEXT AS subscription_name, pg_database.datname::TEXT AS database_name, pg_user.usename::TEXT AS owner, COUNT(sub_stat.pid) AS worker_count, sub.subenabled AS enabled, sub.subconninfo AS subscription_dsn, sub.subslotname::TEXT AS slot_name, sub.subpublications AS publications, stat.remote_lsn::TEXT AS remote_replication_lsn, stat.local_lsn::TEXT AS local_replication_lsn FROM pg_subscription AS sub JOIN pg_user ON sub.subowner = usesysid JOIN pg_database ON sub.subdbid = pg_database.oid LEFT JOIN pg_replication_origin_status stat ON concat('pg_', sub.oid) = stat.external_id LEFT JOIN pg_stat_subscription sub_stat ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL GROUP BY sub.subname, pg_database.datname, pg_user.usename, sub.subenabled, sub.subconninfo, sub.subslotname, sub.subpublications, stat.remote_lsn, stat.local_lsn SQL dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions end |
#sync_subscription(name, options = {}) ⇒ Object
Fetch missing table information from publisher
97 98 99 100 101 102 |
# File 'lib/pg/logical_replication/client.rb', line 97 def sync_subscription(name, = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION SQL typed_exec(@command_builder.(base_command, "WITH", )) end |
#tables_in_publication(name) ⇒ Array<String>
Lists the tables currently in the publication
301 302 303 304 305 306 307 |
# File 'lib/pg/logical_replication/client.rb', line 301 def tables_in_publication(name) typed_exec(<<-SQL, name).values.flatten SELECT tablename::TEXT FROM pg_publication_tables WHERE pubname = $1 SQL end |
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/pg/logical_replication/client.rb', line 35 def wal_retained_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes, slot_name::TEXT FROM pg_replication_slots WHERE plugin = 'pgoutput' SQL end |