Class: PG::LogicalReplication::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/logical_replication/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Client

Returns a new instance of Client.

Parameters:

  • connection (PG::Connection)

    Database Connection



10
11
12
13
# File 'lib/pg/logical_replication/client.rb', line 10

def initialize(connection)
  @connection      = connection
  @command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end

Instance Attribute Details

#command_builderObject (readonly)

Returns the value of attribute command_builder.



7
8
9
# File 'lib/pg/logical_replication/client.rb', line 7

def command_builder
  @command_builder
end

#connectionObject (readonly)

Returns the value of attribute connection.



6
7
8
# File 'lib/pg/logical_replication/client.rb', line 6

def connection
  @connection
end

Instance Method Details

#add_tables_to_publication(name, tables) ⇒ Object

Adds tables to a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to add



244
245
246
# File 'lib/pg/logical_replication/client.rb', line 244

def add_tables_to_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end

#alter_publication_options(name, options) ⇒ Object

Alters parameters originally set by CREATE PUBLICATION

Parameters:

  • name (String)

    publication name

  • options (Hash)

    parameters to set



268
269
270
271
# File 'lib/pg/logical_replication/client.rb', line 268

def alter_publication_options(name, options)
  base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#alter_subscription_options(name, options) ⇒ Object

Alters parameters originally set by CREATE SUBSCRIPTION

Parameters:

  • name (String)

    subscription name

  • options (Hash)

    parameters to set



122
123
124
125
# File 'lib/pg/logical_replication/client.rb', line 122

def alter_subscription_options(name, options)
  base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object

Creates a new publication

Parameters:

  • name (String)

    publication name

  • all_tables (Boolean) (defaults to: false)

    replicate changes for all tables, including ones created in the future

  • tables (Array<String>) (defaults to: [])

    tables to be added to the publication, ignored if all_tables is true

  • options (Hash) (defaults to: {})

    optional parameters



230
231
232
233
234
235
236
237
238
# File 'lib/pg/logical_replication/client.rb', line 230

def create_publication(name, all_tables = false, tables = [], options = {})
  base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
  if all_tables
    base_command << " FOR ALL TABLES"
  elsif !tables.empty?
    base_command << " FOR TABLE #{safe_list(tables)}"
  end
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object

Creates a subscription to a publisher node

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    publisher node connection info

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters for CREATE SUBSCRIPTION



53
54
55
56
57
58
59
60
61
# File 'lib/pg/logical_replication/client.rb', line 53

def create_subscription(name, conninfo_hash, publications, options = {})
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  base_command = <<-SQL
    CREATE SUBSCRIPTION #{connection.quote_ident(name)}
           CONNECTION '#{connection_string}'
           PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(command_builder.command_with_options(base_command, "WITH", options))
end

#disable_subscription(name) ⇒ Object

Disables the running subscription

Parameters:

  • name (String)

    subscription name



114
115
116
# File 'lib/pg/logical_replication/client.rb', line 114

def disable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end

#drop_publication(name, ifexists = false) ⇒ Object

Remove a publication

Parameters:

  • name (String)

    publication name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the publication does not exist



293
294
295
# File 'lib/pg/logical_replication/client.rb', line 293

def drop_publication(name, ifexists = false)
  typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#drop_subscription(name, ifexists = false) ⇒ Object

Disconnects the subscription and removes it

Parameters:

  • name (String)

    subscription name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the subscription does not exist



67
68
69
# File 'lib/pg/logical_replication/client.rb', line 67

def drop_subscription(name, ifexists = false)
  typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#enable_subscription(name) ⇒ Object

Enables the previously disabled subscription

Parameters:

  • name (String)

    subscription name



107
108
109
# File 'lib/pg/logical_replication/client.rb', line 107

def enable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end

#lag_bytesArray<Hash<String,String>>

Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned lag and application names, one for each replication process



20
21
22
23
24
25
26
27
28
# File 'lib/pg/logical_replication/client.rb', line 20

def lag_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,
      application_name
    FROM
      pg_stat_replication
  SQL
end

#publicationsArray<String>

Lists the current publications

Returns:

  • (Array<String>)

    publication names



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/pg/logical_replication/client.rb', line 205

def publications
  typed_exec(<<-SQL)
    SELECT
      pubname::TEXT AS name,
      usename::TEXT AS owner,
      puballtables,
      pubinsert,
      pubupdate,
      pubdelete
    FROM
      pg_publication
      JOIN pg_user ON pubowner = usesysid
  SQL
end

#publishes?(publication_name) ⇒ Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/pg/logical_replication/client.rb', line 220

def publishes?(publication_name)
  publications.any? { |p| p["name"] == publication_name }
end

#remove_tables_from_publication(name, tables) ⇒ Object

Removes tables from a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to remove



260
261
262
# File 'lib/pg/logical_replication/client.rb', line 260

def remove_tables_from_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end

#rename_publication(name, new_name) ⇒ Object

Renames a publication

Parameters:

  • name (String)

    current publication name

  • new_name (String)

    new publication name



285
286
287
# File 'lib/pg/logical_replication/client.rb', line 285

def rename_publication(name, new_name)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#rename_subscription(name, new_name) ⇒ Object

Renames the subscription

Parameters:

  • name (String)

    current subscription name

  • new_name (String)

    new subscription name



139
140
141
# File 'lib/pg/logical_replication/client.rb', line 139

def rename_subscription(name, new_name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#set_publication_owner(name, owner) ⇒ Object

Sets the owner of a publication

Parameters:

  • name (String)

    publication name

  • owner (String)

    new owner user name



277
278
279
# File 'lib/pg/logical_replication/client.rb', line 277

def set_publication_owner(name, owner)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_publication_tables(name, tables) ⇒ Object

Sets the tables included in a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names



252
253
254
# File 'lib/pg/logical_replication/client.rb', line 252

def set_publication_tables(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end

#set_subscription_conninfo(name, conninfo_hash) ⇒ Object

Updates a subscription connection string

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    new external connection hash to the publisher node



75
76
77
78
# File 'lib/pg/logical_replication/client.rb', line 75

def set_subscription_conninfo(name, conninfo_hash)
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end

#set_subscription_owner(name, owner) ⇒ Object

Sets the owner of the subscription

Parameters:

  • name (String)

    subscription name

  • owner (String)

    new owner user name



131
132
133
# File 'lib/pg/logical_replication/client.rb', line 131

def set_subscription_owner(name, owner)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_subscription_publications(name, publications, options = {}) ⇒ Object

Changes list of subscribed publications

Parameters:

  • name (String)

    subscription name

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters



85
86
87
88
89
90
91
# File 'lib/pg/logical_replication/client.rb', line 85

def set_subscription_publications(name, publications, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)}
    SET PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#subscriber?Boolean

Returns if this database is subscribing to any publications

Returns:

  • (Boolean)

    true if there are any subscriptions, false otherwise



198
199
200
# File 'lib/pg/logical_replication/client.rb', line 198

def subscriber?
  subscriptions.any?
end

#subscriptions(dbname = nil) ⇒ Array<Hash>

Shows status and basic information about all subscriptions

Returns:

  • (Array<Hash>)

    a list of subscriptions keys:

    subscription_name
    database_name
    owner
    worker_count
    enabled
    subscription_dsn
    slot_name
    publications
    remote_replication_lsn
    local_replication_lsn
    


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/pg/logical_replication/client.rb', line 157

def subscriptions(dbname = nil)
  subscriptions = typed_exec(<<-SQL).to_a
    SELECT
      sub.subname::TEXT         AS subscription_name,
      pg_database.datname::TEXT AS database_name,
      pg_user.usename::TEXT     AS owner,
      COUNT(sub_stat.pid)       AS worker_count,
      sub.subenabled            AS enabled,
      sub.subconninfo           AS subscription_dsn,
      sub.subslotname::TEXT     AS slot_name,
      sub.subpublications       AS publications,
      stat.remote_lsn::TEXT     AS remote_replication_lsn,
      stat.local_lsn::TEXT      AS local_replication_lsn
    FROM
      pg_subscription AS sub
      JOIN pg_user
        ON sub.subowner = usesysid
      JOIN pg_database
        ON sub.subdbid = pg_database.oid
      LEFT JOIN pg_replication_origin_status stat
        ON concat('pg_', sub.oid) = stat.external_id
      LEFT JOIN pg_stat_subscription sub_stat
        ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL
    GROUP BY
      sub.subname,
      pg_database.datname,
      pg_user.usename,
      sub.subenabled,
      sub.subconninfo,
      sub.subslotname,
      sub.subpublications,
      stat.remote_lsn,
      stat.local_lsn
  SQL

  dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions
end

#sync_subscription(name, options = {}) ⇒ Object

Fetch missing table information from publisher

Parameters:

  • name (String)

    subscription name

  • options (Hash) (defaults to: {})

    optional parameters



97
98
99
100
101
102
# File 'lib/pg/logical_replication/client.rb', line 97

def sync_subscription(name, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#tables_in_publication(name) ⇒ Array<String>

Lists the tables currently in the publication

Parameters:

  • set_name (String)

    publication name

Returns:

  • (Array<String>)

    table names



301
302
303
304
305
306
307
# File 'lib/pg/logical_replication/client.rb', line 301

def tables_in_publication(name)
  typed_exec(<<-SQL, name).values.flatten
    SELECT tablename::TEXT
    FROM pg_publication_tables
    WHERE pubname = $1
  SQL
end

#wal_retained_bytesArray<Hash<String,String>>

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned WAL bytes and replication slot names, one for each replication process



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/pg/logical_replication/client.rb', line 35

def wal_retained_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,
      slot_name::TEXT
    FROM
      pg_replication_slots
    WHERE
      plugin = 'pgoutput'
  SQL
end