Class: PG::LogicalReplication::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/logical_replication/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Client

Returns a new instance of Client.

Parameters:

  • connection (PG::Connection)

    Database Connection



18
19
20
21
# File 'lib/pg/logical_replication/client.rb', line 18

def initialize(connection)
  @connection      = connection
  @command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end

Instance Attribute Details

#command_builderObject (readonly)

Returns the value of attribute command_builder.



7
8
9
# File 'lib/pg/logical_replication/client.rb', line 7

def command_builder
  @command_builder
end

#connectionObject (readonly)

Returns the value of attribute connection.



6
7
8
# File 'lib/pg/logical_replication/client.rb', line 6

def connection
  @connection
end

Class Method Details

.type_map_for_queries(connection) ⇒ Object



9
10
11
# File 'lib/pg/logical_replication/client.rb', line 9

def self.type_map_for_queries(connection)
  @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection)
end

.type_map_for_results(connection) ⇒ Object



13
14
15
# File 'lib/pg/logical_replication/client.rb', line 13

def self.type_map_for_results(connection)
  @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection)
end

Instance Method Details

#add_tables_to_publication(name, tables) ⇒ Object

Adds tables to a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to add



282
283
284
# File 'lib/pg/logical_replication/client.rb', line 282

def add_tables_to_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end

#alter_publication_options(name, options) ⇒ Object

Alters parameters originally set by CREATE PUBLICATION

Parameters:

  • name (String)

    publication name

  • options (Hash)

    parameters to set



306
307
308
309
# File 'lib/pg/logical_replication/client.rb', line 306

def alter_publication_options(name, options)
  base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#alter_subscription_options(name, options) ⇒ Object

Alters parameters originally set by CREATE SUBSCRIPTION

Parameters:

  • name (String)

    subscription name

  • options (Hash)

    parameters to set



144
145
146
147
# File 'lib/pg/logical_replication/client.rb', line 144

def alter_subscription_options(name, options)
  base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end

#create_logical_replication_slot(name) ⇒ Object

Creates a logical replication slot

Parameters:

  • name (String)

    logical replication slot name



82
83
84
# File 'lib/pg/logical_replication/client.rb', line 82

def create_logical_replication_slot(name)
  typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')")
end

#create_publication(name, all_tables = false, tables = [], options = {}) ⇒ Object

Creates a new publication

Parameters:

  • name (String)

    publication name

  • all_tables (Boolean) (defaults to: false)

    replicate changes for all tables, including ones created in the future

  • tables (Array<String>) (defaults to: [])

    tables to be added to the publication, ignored if all_tables is true

  • options (Hash) (defaults to: {})

    optional parameters



268
269
270
271
272
273
274
275
276
# File 'lib/pg/logical_replication/client.rb', line 268

def create_publication(name, all_tables = false, tables = [], options = {})
  base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
  if all_tables
    base_command << " FOR ALL TABLES"
  elsif !tables.empty?
    base_command << " FOR TABLE #{safe_list(tables)}"
  end
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#create_subscription(name, conninfo_hash, publications, options = {}) ⇒ Object

Creates a subscription to a publisher node

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    publisher node connection info

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters for CREATE SUBSCRIPTION



61
62
63
64
65
66
67
68
69
# File 'lib/pg/logical_replication/client.rb', line 61

def create_subscription(name, conninfo_hash, publications, options = {})
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  base_command = <<-SQL
    CREATE SUBSCRIPTION #{connection.quote_ident(name)}
           CONNECTION '#{connection_string}'
           PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(command_builder.command_with_options(base_command, "WITH", options))
end

#disable_subscription(name) ⇒ Object

Disables the running subscription

Parameters:

  • name (String)

    subscription name



136
137
138
# File 'lib/pg/logical_replication/client.rb', line 136

def disable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end

#drop_publication(name, ifexists = false) ⇒ Object

Remove a publication

Parameters:

  • name (String)

    publication name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the publication does not exist



331
332
333
# File 'lib/pg/logical_replication/client.rb', line 331

def drop_publication(name, ifexists = false)
  typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#drop_replication_slot(name) ⇒ Object

Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.

Parameters:

  • name (String)

    replication slot name



89
90
91
# File 'lib/pg/logical_replication/client.rb', line 89

def drop_replication_slot(name)
  typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})")
end

#drop_subscription(name, ifexists = false) ⇒ Object

Disconnects the subscription and removes it

Parameters:

  • name (String)

    subscription name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the subscription does not exist



75
76
77
# File 'lib/pg/logical_replication/client.rb', line 75

def drop_subscription(name, ifexists = false)
  typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end

#enable_subscription(name) ⇒ Object

Enables the previously disabled subscription

Parameters:

  • name (String)

    subscription name



129
130
131
# File 'lib/pg/logical_replication/client.rb', line 129

def enable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end

#lag_bytesArray<Hash<String,String>>

Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned lag and application names, one for each replication process



28
29
30
31
32
33
34
35
36
# File 'lib/pg/logical_replication/client.rb', line 28

def lag_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,
      application_name
    FROM
      pg_stat_replication
  SQL
end

#publicationsArray<String>

Lists the current publications

Returns:

  • (Array<String>)

    publication names



243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/pg/logical_replication/client.rb', line 243

def publications
  typed_exec(<<-SQL)
    SELECT
      pubname::TEXT AS name,
      usename::TEXT AS owner,
      puballtables,
      pubinsert,
      pubupdate,
      pubdelete
    FROM
      pg_publication
      JOIN pg_user ON pubowner = usesysid
  SQL
end

#publishes?(publication_name) ⇒ Boolean

Returns:

  • (Boolean)


258
259
260
# File 'lib/pg/logical_replication/client.rb', line 258

def publishes?(publication_name)
  publications.any? { |p| p["name"] == publication_name }
end

#remove_tables_from_publication(name, tables) ⇒ Object

Removes tables from a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names to remove



298
299
300
# File 'lib/pg/logical_replication/client.rb', line 298

def remove_tables_from_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end

#rename_publication(name, new_name) ⇒ Object

Renames a publication

Parameters:

  • name (String)

    current publication name

  • new_name (String)

    new publication name



323
324
325
# File 'lib/pg/logical_replication/client.rb', line 323

def rename_publication(name, new_name)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#rename_subscription(name, new_name) ⇒ Object

Renames the subscription

Parameters:

  • name (String)

    current subscription name

  • new_name (String)

    new subscription name



161
162
163
# File 'lib/pg/logical_replication/client.rb', line 161

def rename_subscription(name, new_name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end

#replication_slotsArray<String>

Lists the current replication slots

Returns:

  • (Array<String>)

    replication slots



227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/pg/logical_replication/client.rb', line 227

def replication_slots
  typed_exec(<<-SQL)
    SELECT
      slot_name::TEXT,
      plugin::TEXT,
      slot_type::TEXT,
      database::TEXT,
      temporary,
      active
    FROM pg_replication_slots
  SQL
end

#set_publication_owner(name, owner) ⇒ Object

Sets the owner of a publication

Parameters:

  • name (String)

    publication name

  • owner (String)

    new owner user name



315
316
317
# File 'lib/pg/logical_replication/client.rb', line 315

def set_publication_owner(name, owner)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_publication_tables(name, tables) ⇒ Object

Sets the tables included in a publication

Parameters:

  • name (String)

    publication name

  • tables (Array<String>)

    table names



290
291
292
# File 'lib/pg/logical_replication/client.rb', line 290

def set_publication_tables(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end

#set_subscription_conninfo(name, conninfo_hash) ⇒ Object

Updates a subscription connection string

Parameters:

  • name (String)

    subscription name

  • conninfo_hash (Hash)

    new external connection hash to the publisher node



97
98
99
100
# File 'lib/pg/logical_replication/client.rb', line 97

def set_subscription_conninfo(name, conninfo_hash)
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end

#set_subscription_owner(name, owner) ⇒ Object

Sets the owner of the subscription

Parameters:

  • name (String)

    subscription name

  • owner (String)

    new owner user name



153
154
155
# File 'lib/pg/logical_replication/client.rb', line 153

def set_subscription_owner(name, owner)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end

#set_subscription_publications(name, publications, options = {}) ⇒ Object

Changes list of subscribed publications

Parameters:

  • name (String)

    subscription name

  • publications (Array<String>)

    publication names to subscribe to

  • options (Hash) (defaults to: {})

    optional parameters



107
108
109
110
111
112
113
# File 'lib/pg/logical_replication/client.rb', line 107

def set_subscription_publications(name, publications, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)}
    SET PUBLICATION #{safe_list(publications)}
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#subscriber?(dbname = nil) ⇒ Boolean

Returns if this database is subscribing to any publications

Returns:

  • (Boolean)

    true if there are any subscriptions, false otherwise



220
221
222
# File 'lib/pg/logical_replication/client.rb', line 220

def subscriber?(dbname = nil)
  subscriptions(dbname).any?
end

#subscriptions(dbname = nil) ⇒ Array<Hash>

Shows status and basic information about all subscriptions

Returns:

  • (Array<Hash>)

    a list of subscriptions keys:

    subscription_name
    database_name
    owner
    worker_count
    enabled
    subscription_dsn
    slot_name
    publications
    remote_replication_lsn
    local_replication_lsn
    


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/pg/logical_replication/client.rb', line 179

def subscriptions(dbname = nil)
  subscriptions = typed_exec(<<-SQL).to_a
    SELECT
      sub.subname::TEXT         AS subscription_name,
      pg_database.datname::TEXT AS database_name,
      pg_user.usename::TEXT     AS owner,
      COUNT(sub_stat.pid)       AS worker_count,
      sub.subenabled            AS enabled,
      sub.subconninfo           AS subscription_dsn,
      sub.subslotname::TEXT     AS slot_name,
      sub.subpublications       AS publications,
      stat.remote_lsn::TEXT     AS remote_replication_lsn,
      stat.local_lsn::TEXT      AS local_replication_lsn
    FROM
      pg_subscription AS sub
      JOIN pg_user
        ON sub.subowner = usesysid
      JOIN pg_database
        ON sub.subdbid = pg_database.oid
      LEFT JOIN pg_replication_origin_status stat
        ON concat('pg_', sub.oid) = stat.external_id
      LEFT JOIN pg_stat_subscription sub_stat
        ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL
    GROUP BY
      sub.subname,
      pg_database.datname,
      pg_user.usename,
      sub.subenabled,
      sub.subconninfo,
      sub.subslotname,
      sub.subpublications,
      stat.remote_lsn,
      stat.local_lsn
  SQL

  dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions
end

#sync_subscription(name, options = {}) ⇒ Object

Fetch missing table information from publisher

Parameters:

  • name (String)

    subscription name

  • options (Hash) (defaults to: {})

    optional parameters



119
120
121
122
123
124
# File 'lib/pg/logical_replication/client.rb', line 119

def sync_subscription(name, options = {})
  base_command = <<-SQL
    ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION
  SQL
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end

#tables_in_publication(name) ⇒ Array<String>

Lists the tables currently in the publication

Parameters:

  • set_name (String)

    publication name

Returns:

  • (Array<String>)

    table names



339
340
341
342
343
344
345
# File 'lib/pg/logical_replication/client.rb', line 339

def tables_in_publication(name)
  typed_exec(<<-SQL, name).values.flatten
    SELECT tablename::TEXT
    FROM pg_publication_tables
    WHERE pubname = $1
  SQL
end

#wal_retained_bytesArray<Hash<String,String>>

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node

Returns:

  • (Array<Hash<String,String>>)

    List of returned WAL bytes and replication slot names, one for each replication process



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pg/logical_replication/client.rb', line 43

def wal_retained_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,
      slot_name::TEXT
    FROM
      pg_replication_slots
    WHERE
      plugin = 'pgoutput'
  SQL
end