Class: PG::LogicalReplication::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/logical_replication/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Client

Returns a new instance of Client.

Parameters:

  • connection (PG::Connection)

    Database Connection



18
19
20
21
# File 'lib/pg/logical_replication/client.rb', line 18

def initialize(connection)
  @connection      = connection
  @command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end

Instance Attribute Details

#command_builderObject (readonly)

Returns the value of attribute command_builder.



7
8
9
# File 'lib/pg/logical_replication/client.rb', line 7

def command_builder
  @command_builder
end

#connectionObject (readonly)

Returns the value of attribute connection.



6
7
8
# File 'lib/pg/logical_replication/client.rb', line 6

def connection
  @connection
end

Class Method Details

.type_map_for_queries(connection) ⇒ Object



9
10
11
# File 'lib/pg/logical_replication/client.rb', line 9

def self.type_map_for_queries(connection)
  @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection)
end

.type_map_for_results(connection) ⇒ Object



13
14
15
# File 'lib/pg/logical_replication/client.rb', line 13

def self.type_map_for_results(connection)
  @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection)
end

Instance Method Details

#add_tables_to_publication(name, tables) ⇒ Object

Adds tables to a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to add



284
285
286
# File 'lib/pg/logical_replication/client.rb', line 284

def add_tables_to_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end

#alter_publication_options(name, options) ⇒ Object

Alters parameters originally set by CREATE PUBLICATION

Parameters:

  • name (String)

    publication name

  • options (Hash)

    parameters to set



308
309
310
311
# File 'lib/pg/logical_replication/client.rb', line 308

def alter_publication_options(name, options)
  base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#alter_subscription_options(name, options) ⇒ Object

Alters parameters originally set by CREATE SUBSCRIPTION

Parameters:

  • name (String)

    subscription name

  • options (Hash)

    parameters to set



146
147
148
149
# File 'lib/pg/logical_replication/client.rb', line 146

def alter_subscription_options(name, options)
  base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#create_logical_replication_slot(name) ⇒ Object

Creates a logical replication slot

Parameters:

  • name (String)

    logical replication slot name



84
85
86
# File 'lib/pg/logical_replication/client.rb', line 84

def create_logical_replication_slot(name)
  typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')")
end

#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object

Creates a new publication

Parameters:

  • name (String)

    publication name

  • all_tables (Boolean) (defaults to: false)

    replicate changes for all tables, including ones created in the future

  • tables (Array<String>) (defaults to: [])

    tables to be added to the publication, ignored if all_tables is true

  • options (Hash) (defaults to: {})

    optional parameters



270
271
272
273
274
275
276
277
278
# File 'lib/pg/logical_replication/client.rb', line 270

def create_publication(name, all_tables = false, tables = [], options = {})
  base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
  if all_tables
    base_command << " FOR ALL TABLES"
  elsif !tables.empty?
    base_command << " FOR TABLE #{safe_list(tables)}"
  end
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object

Creates a subscription to a publisher node

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    publisher node connection info

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters for CREATE SUBSCRIPTION



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pg/logical_replication/client.rb', line 61

def create_subscription(name, conninfo_hash, publications, options = {})
  options[:slot_name] = name if !options.key?(:slot_name) && !options.key?("slot_name") && (options['create_slot'] == false || options[:create_slot] == false)

  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  base_command = <<-SQL
    CREATE SUBSCRIPTION #{connection.quote_ident(name)}
           CONNECTION '#{connection_string}'
           PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(command_builder.command_with_options(base_command, "WITH", options))
end

#disable_subscription(name) ⇒ Object

Disables the running subscription

Parameters:

  • name (String)

    subscription name



138
139
140
# File 'lib/pg/logical_replication/client.rb', line 138

def disable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end

#drop_publication(name, ifexists = false) ⇒ Object

Remove a publication

Parameters:

  • name (String)

    publication name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the publication does not exist



333
334
335
# File 'lib/pg/logical_replication/client.rb', line 333

def drop_publication(name, ifexists = false)
  typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#drop_replication_slot(name) ⇒ Object

Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.

Parameters:

  • name (String)

    replication slot name



91
92
93
# File 'lib/pg/logical_replication/client.rb', line 91

def drop_replication_slot(name)
  typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})")
end

#drop_subscription(name, ifexists = false) ⇒ Object

Disconnects the subscription and removes it

Parameters:

  • name (String)

    subscription name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the subscription does not exist



77
78
79
# File 'lib/pg/logical_replication/client.rb', line 77

def drop_subscription(name, ifexists = false)
  typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#enable_subscription(name) ⇒ Object

Enables the previously disabled subscription

Parameters:

  • name (String)

    subscription name



131
132
133
# File 'lib/pg/logical_replication/client.rb', line 131

def enable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end

#lag_bytesArray<Hash<String,String>>

Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned lag and application names, one for each replication process



28
29
30
31
32
33
34
35
36
# File 'lib/pg/logical_replication/client.rb', line 28

def lag_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,
      application_name
    FROM
      pg_stat_replication
  SQL
end

#publicationsArray<String>

Lists the current publications

Returns:

  • (Array<String>)

    publication names



245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/pg/logical_replication/client.rb', line 245

def publications
  typed_exec(<<-SQL)
    SELECT
      pubname::TEXT AS name,
      usename::TEXT AS owner,
      puballtables,
      pubinsert,
      pubupdate,
      pubdelete
    FROM
      pg_publication
      JOIN pg_user ON pubowner = usesysid
  SQL
end

#publishes?(publication_name) ⇒ Boolean

Returns:

  • (Boolean)


260
261
262
# File 'lib/pg/logical_replication/client.rb', line 260

def publishes?(publication_name)
  publications.any? { |p| p["name"] == publication_name }
end

#remove_tables_from_publication(name, tables) ⇒ Object

Removes tables from a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to remove



300
301
302
# File 'lib/pg/logical_replication/client.rb', line 300

def remove_tables_from_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end

#rename_publication(name, new_name) ⇒ Object

Renames a publication

Parameters:

  • name (String)

    current publication name

  • new_name (String)

    new publication name



325
326
327
# File 'lib/pg/logical_replication/client.rb', line 325

def rename_publication(name, new_name)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#rename_subscription(name, new_name) ⇒ Object

Renames the subscription

Parameters:

  • name (String)

    current subscription name

  • new_name (String)

    new subscription name



163
164
165
# File 'lib/pg/logical_replication/client.rb', line 163

def rename_subscription(name, new_name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#replication_slotsArray<String>

Lists the current replication slots

Returns:

  • (Array<String>)

    replication slots



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/pg/logical_replication/client.rb', line 229

def replication_slots
  typed_exec(<<-SQL)
    SELECT
      slot_name::TEXT,
      plugin::TEXT,
      slot_type::TEXT,
      database::TEXT,
      temporary,
      active
    FROM pg_replication_slots
  SQL
end

#set_publication_owner(name, owner) ⇒ Object

Sets the owner of a publication

Parameters:

  • name (String)

    publication name

  • owner (String)

    new owner user name



317
318
319
# File 'lib/pg/logical_replication/client.rb', line 317

def set_publication_owner(name, owner)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_publication_tables(name, tables) ⇒ Object

Sets the tables included in a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names



292
293
294
# File 'lib/pg/logical_replication/client.rb', line 292

def set_publication_tables(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end

#set_subscription_conninfo(name, conninfo_hash) ⇒ Object

Updates a subscription connection string

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    new external connection hash to the publisher node



99
100
101
102
# File 'lib/pg/logical_replication/client.rb', line 99

def set_subscription_conninfo(name, conninfo_hash)
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end

#set_subscription_owner(name, owner) ⇒ Object

Sets the owner of the subscription

Parameters:

  • name (String)

    subscription name

  • owner (String)

    new owner user name



155
156
157
# File 'lib/pg/logical_replication/client.rb', line 155

def set_subscription_owner(name, owner)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_subscription_publications(name, publications, options = {}) ⇒ Object

Changes list of subscribed publications

Parameters:

  • name (String)

    subscription name

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters



109
110
111
112
113
114
115
# File 'lib/pg/logical_replication/client.rb', line 109

def set_subscription_publications(name, publications, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)}
    SET PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#subscriber?(dbname = nil) ⇒ Boolean

Returns if this database is subscribing to any publications

Returns:

  • (Boolean)

    true if there are any subscriptions, false otherwise



222
223
224
# File 'lib/pg/logical_replication/client.rb', line 222

def subscriber?(dbname = nil)
  subscriptions(dbname).any?
end

#subscriptions(dbname = nil) ⇒ Array<Hash>

Shows status and basic information about all subscriptions

Returns:

  • (Array<Hash>)

    a list of subscriptions keys:

    subscription_name
    database_name
    owner
    worker_count
    enabled
    subscription_dsn
    slot_name
    publications
    remote_replication_lsn
    local_replication_lsn
    


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/pg/logical_replication/client.rb', line 181

def subscriptions(dbname = nil)
  subscriptions = typed_exec(<<-SQL).to_a
    SELECT
      sub.subname::TEXT         AS subscription_name,
      pg_database.datname::TEXT AS database_name,
      pg_user.usename::TEXT     AS owner,
      COUNT(sub_stat.pid)       AS worker_count,
      sub.subenabled            AS enabled,
      sub.subconninfo           AS subscription_dsn,
      sub.subslotname::TEXT     AS slot_name,
      sub.subpublications       AS publications,
      stat.remote_lsn::TEXT     AS remote_replication_lsn,
      stat.local_lsn::TEXT      AS local_replication_lsn
    FROM
      pg_subscription AS sub
      JOIN pg_user
        ON sub.subowner = usesysid
      JOIN pg_database
        ON sub.subdbid = pg_database.oid
      LEFT JOIN pg_replication_origin_status stat
        ON concat('pg_', sub.oid) = stat.external_id
      LEFT JOIN pg_stat_subscription sub_stat
        ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL
    GROUP BY
      sub.subname,
      pg_database.datname,
      pg_user.usename,
      sub.subenabled,
      sub.subconninfo,
      sub.subslotname,
      sub.subpublications,
      stat.remote_lsn,
      stat.local_lsn
  SQL

  dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions
end

#sync_subscription(name, options = {}) ⇒ Object

Fetch missing table information from publisher

Parameters:

  • name (String)

    subscription name

  • options (Hash) (defaults to: {})

    optional parameters



121
122
123
124
125
126
# File 'lib/pg/logical_replication/client.rb', line 121

def sync_subscription(name, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#tables_in_publication(name) ⇒ Array<String>

Lists the tables currently in the publication

Parameters:

  • set_name (String)

    publication name

Returns:

  • (Array<String>)

    table names



341
342
343
344
345
346
347
# File 'lib/pg/logical_replication/client.rb', line 341

def tables_in_publication(name)
  typed_exec(<<-SQL, name).values.flatten
    SELECT tablename::TEXT
    FROM pg_publication_tables
    WHERE pubname = $1
  SQL
end

#wal_retained_bytesArray<Hash<String,String>>

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned WAL bytes and replication slot names, one for each replication process



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pg/logical_replication/client.rb', line 43

def wal_retained_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,
      slot_name::TEXT
    FROM
      pg_replication_slots
    WHERE
      plugin = 'pgoutput'
  SQL
end