Class: PG::Pglogical::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/pglogical/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Client

Returns a new instance of Client.

Parameters:

  • connection (PostgreSQLAdapter)

    ActiveRecord database connection



9
10
11
# File 'lib/pg/pglogical/client.rb', line 9

def initialize(connection)
  @connection = connection
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



6
7
8
# File 'lib/pg/pglogical/client.rb', line 6

def connection
  @connection
end

Instance Method Details

#disableObject

Disables pglogical postgres extensions



34
35
36
37
# File 'lib/pg/pglogical/client.rb', line 34

def disable
  connection.disable_extension("pglogical")
  connection.disable_extension("pglogical_origin") if connection.postgresql_version < 90_500
end

#enableObject

Enables pglogical postgres extensions



28
29
30
31
# File 'lib/pg/pglogical/client.rb', line 28

def enable
  connection.enable_extension("pglogical_origin") if connection.postgresql_version < 90_500
  connection.enable_extension("pglogical")
end

#enabled?Boolean

Returns whether pglogical is currently enabled or not

Returns:

  • (Boolean)


21
22
23
24
25
# File 'lib/pg/pglogical/client.rb', line 21

def enabled?
  return false unless installed? && connection.extension_enabled?("pglogical")
  return true if connection.postgresql_version >= 90_500
  connection.extension_enabled?("pglogical_origin")
end

#installed?Boolean

Returns whether the pglogical postgres extension is installed or not

Returns:

  • (Boolean)


14
15
16
# File 'lib/pg/pglogical/client.rb', line 14

def installed?
  connection.select_value("SELECT EXISTS(SELECT * FROM pg_available_extensions WHERE name = 'pglogical')")
end

#lag_bytesArray<Hash<String,String>>

Reports on replication lag from provider to subscriber nodes This method must be run on the provider node

Returns:

  • (Array<Hash<String,String>>)

    List of returned lag and application names, one for each replication process



47
48
49
50
51
52
53
54
# File 'lib/pg/pglogical/client.rb', line 47

def lag_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_xlog_location_diff(pg_current_xlog_insert_location(), flush_location) AS lag_bytes,
      application_name
    FROM pg_stat_replication
  SQL
end

#node_create(name, dsn) ⇒ Object

Creates a node

Parameters:

  • name (String)
  • dsn (String)

    external connection string to the node



78
79
80
# File 'lib/pg/pglogical/client.rb', line 78

def node_create(name, dsn)
  typed_exec("SELECT pglogical.create_node($1, $2)", name, dsn)
end

#node_drop(name, ifexists = false) ⇒ Object

Drops the node

Parameters:

  • name (String)
  • ifexists (Boolean) (defaults to: false)


110
111
112
# File 'lib/pg/pglogical/client.rb', line 110

def node_drop(name, ifexists = false)
  typed_exec("SELECT pglogical.drop_node($1, $2)", name, ifexists)
end

#node_dsn_update(name, dsn) ⇒ Boolean

Updates a node connection string

NOTE: This method relies on the internals of the pglogical tables

rather than a published API.

NOTE: Disable subscriptions involving the node before calling this

method for a provider node in a subscriber database.

Parameters:

  • name (String)
  • dsn (String)

    new external connection string to the node

Returns:

  • (Boolean)

    true if the dsn was updated, false otherwise



92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/pg/pglogical/client.rb', line 92

def node_dsn_update(name, dsn)
  res = typed_exec(<<-SQL, name, dsn)
    UPDATE pglogical.node_interface
    SET if_dsn = $2
    WHERE if_nodeid = (
      SELECT node_id
      FROM pglogical.node
      WHERE node_name = $1
    )
  SQL

  res.cmd_tuples == 1
end

#nodesObject



114
115
116
117
118
119
120
# File 'lib/pg/pglogical/client.rb', line 114

def nodes
  typed_exec(<<-SQL)
    SELECT node_name AS name, if_dsn AS conn_string
    FROM pglogical.node join pglogical.node_interface
      ON if_nodeid = node_id
  SQL
end

#replication_set_add_all_tables(set_name, schema_names, sync = false) ⇒ Object

Adds all tables in the given schemas to the replication set

Parameters:

  • set_name (String)

    replication set name

  • schema_names (Array<String>)

    list of schema names

  • sync (Boolean) (defaults to: false)

    sync table data to all the subscribers to the replication set



299
300
301
302
# File 'lib/pg/pglogical/client.rb', line 299

def replication_set_add_all_tables(set_name, schema_names, sync = false)
  typed_exec("SELECT pglogical.replication_set_add_all_tables($1, $2, $3)",
             set_name, schema_names, sync)
end

#replication_set_add_table(set_name, table_name, sync = false) ⇒ Object

Adds a table to a replication set

Parameters:

  • set_name (String)

    replication set name

  • table_name (String)

    table name to add

  • sync (Boolean) (defaults to: false)

    sync the table on all subscribers to the given replication set



289
290
291
292
# File 'lib/pg/pglogical/client.rb', line 289

def replication_set_add_table(set_name, table_name, sync = false)
  typed_exec("SELECT pglogical.replication_set_add_table($1, $2, $3)",
             set_name, table_name, sync)
end

#replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object

Alters an existing replication set

Parameters:

  • set_name (String)

    replication set name

  • insert (Boolean) (defaults to: true)

    replicate INSERT events

  • update (Boolean) (defaults to: true)

    replicate UPDATE events

  • delete (Boolean) (defaults to: true)

    replicate DELETE events

  • truncate (Boolean) (defaults to: true)

    replicate TRUNCATE events



272
273
274
275
# File 'lib/pg/pglogical/client.rb', line 272

def replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true)
  typed_exec("SELECT pglogical.alter_replication_set($1, $2, $3, $4, $5)",
             set_name, insert, update, delete, truncate)
end

#replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object

Creates a new replication set

Parameters:

  • set_name (String)

    new replication set name

  • insert (Boolean) (defaults to: true)

    replicate INSERT events

  • update (Boolean) (defaults to: true)

    replicate UPDATE events

  • delete (Boolean) (defaults to: true)

    replicate DELETE events

  • truncate (Boolean) (defaults to: true)

    replicate TRUNCATE events



260
261
262
263
# File 'lib/pg/pglogical/client.rb', line 260

def replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true)
  typed_exec("SELECT pglogical.create_replication_set($1, $2, $3, $4, $5)",
             set_name, insert, update, delete, truncate)
end

#replication_set_drop(set_name) ⇒ Object

Removes a replication set

Parameters:

  • set_name (string)

    replication set name



280
281
282
# File 'lib/pg/pglogical/client.rb', line 280

def replication_set_drop(set_name)
  typed_exec("SELECT pglogical.drop_replication_set($1)", set_name)
end

#replication_set_remove_table(set_name, table_name) ⇒ Object

Removes a table from a replication set

Parameters:

  • set_name (String)

    replication set name

  • table_name (String)

    table to remove



308
309
310
311
# File 'lib/pg/pglogical/client.rb', line 308

def replication_set_remove_table(set_name, table_name)
  typed_exec("SELECT pglogical.replication_set_remove_table($1, $2)",
             set_name, table_name)
end

#replication_setsArray<String>

Lists the current replication sets

Returns:

  • (Array<String>)

    the replication sets



249
250
251
# File 'lib/pg/pglogical/client.rb', line 249

def replication_sets
  typed_exec("SELECT set_name FROM pglogical.replication_set").values.flatten
end

#subscription_add_replication_set(name, set_name) ⇒ Object

Adds a replication set to a subscription Does not sync, only activates event consumption

Parameters:

  • name (String)

    subscription name

  • set_name (String)

    replication set name



192
193
194
195
# File 'lib/pg/pglogical/client.rb', line 192

def subscription_add_replication_set(name, set_name)
  typed_exec("SELECT pglogical.alter_subscription_add_replication_set($1, $2)",
             name, set_name)
end

#subscription_create(name, dsn, replication_sets = %w(default default_insert_only), sync_structure = true, sync_data = true, forward_origins = ["all"]) ⇒ Object

Creates a subscription to a provider node

Parameters:

  • name (String)

    subscription name

  • dsn (String)

    provider node connection string

  • replication_sets (Array<String>) (defaults to: %w(default default_insert_only))

    replication set names to subscribe to

  • sync_structure (Boolean) (defaults to: true)

    sync the schema structure when subscribing

  • sync_data (Boolean) (defaults to: true)

    sync the data when subscribing

  • forward_origins (Array<String>) (defaults to: ["all"])

    names of non-provider nodes to replicate changes from (cascading replication)



134
135
136
137
138
# File 'lib/pg/pglogical/client.rb', line 134

def subscription_create(name, dsn, replication_sets = %w(default default_insert_only), # rubocop:disable Metrics/ParameterLists
                        sync_structure = true, sync_data = true, forward_origins = ["all"])
  typed_exec("SELECT pglogical.create_subscription($1, $2, $3, $4, $5, $6)",
             name, dsn, replication_sets, sync_structure, sync_data, forward_origins)
end

#subscription_disable(name, immediate = false) ⇒ Object

Disables a subscription and disconnects it from the provider

Parameters:

  • name (String)

    subscription name

  • immediate (Boolean) (defaults to: false)

    do not wait for the current transaction before stopping



153
154
155
156
# File 'lib/pg/pglogical/client.rb', line 153

def subscription_disable(name, immediate = false)
  typed_exec("SELECT pglogical.alter_subscription_disable($1, $2)",
             name, immediate)
end

#subscription_drop(name, ifexists = false) ⇒ Object

Disconnects the subscription and removes it

Parameters:

  • name (String)

    subscription name

  • ifexists (Boolean) (defaults to: false)

    if true an error is not thrown when the subscription does not exist



144
145
146
147
# File 'lib/pg/pglogical/client.rb', line 144

def subscription_drop(name, ifexists = false)
  typed_exec("SELECT pglogical.drop_subscription($1, $2)",
             name, ifexists)
end

#subscription_enable(name, immediate = false) ⇒ Object

Enables a previously disabled subscription

Parameters:

  • name (String)

    subscription name

  • immediate (Boolean) (defaults to: false)

    do not wait for the current transaction before starting



162
163
164
165
# File 'lib/pg/pglogical/client.rb', line 162

def subscription_enable(name, immediate = false)
  typed_exec("SELECT pglogical.alter_subscription_enable($1, $2)",
             name, immediate)
end

#subscription_remove_replication_set(name, set_name) ⇒ Object

Removes a replication set from a subscription

Parameters:

  • name (String)

    subscription name

  • set_name (String)

    replication set name



201
202
203
204
# File 'lib/pg/pglogical/client.rb', line 201

def subscription_remove_replication_set(name, set_name)
  typed_exec("SELECT pglogical.alter_subscription_remove_replication_set($1, $2)",
             name, set_name)
end

#subscription_resync_table(name, table) ⇒ Object

Resyncs one existing table Table will be truncated before the sync

Parameters:

  • name (String)

    subscription name

  • table (String)

    name of the table to resync



182
183
184
185
# File 'lib/pg/pglogical/client.rb', line 182

def subscription_resync_table(name, table)
  typed_exec("SELECT pglogical.alter_subscription_resynchronize_table($1, $2)",
             name, table)
end

#subscription_show_status(name) ⇒ Hash

Shows status and basic information about a subscription

Returns:

  • (Hash)

    a hash with the subscription information keys:

    subscription_name
    status
    provider_node
    provider_dsn
    slot_name
    replication_sets
    forward_origins
    remote_replication_lsn(Log Sequence Number)
    local_replication_lsn(Log Sequence Number)
    


220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/pg/pglogical/client.rb', line 220

def subscription_show_status(name)
  sql = <<-SQL
    SELECT sub.*, stat.remote_lsn AS remote_replication_lsn, stat.local_lsn AS local_replication_lsn
    FROM pglogical.show_subscription_status($1) sub
    LEFT JOIN pg_replication_origin_status stat
      ON sub.slot_name = stat.external_id
  SQL

  typed_exec(sql, name).first.tap do |s|
    s["replication_sets"] = s["replication_sets"][1..-2].split(",")
    s["forward_origins"] = s["forward_origins"][1..-2].split(",")
  end
end

#subscription_sync(name, truncate = false) ⇒ Object

Syncs all unsynchronized tables in all sets in a single operation.

Command does not block

Parameters:

  • name (String)

    subscription name

  • truncate (Boolean) (defaults to: false)

    truncate the tables before syncing



172
173
174
175
# File 'lib/pg/pglogical/client.rb', line 172

def subscription_sync(name, truncate = false)
  typed_exec("SELECT pglogical.alter_subscription_synchronize($1, $2)",
             name, truncate)
end

#subscriptionsObject

Shows the status of all configured subscriptions



237
238
239
240
241
# File 'lib/pg/pglogical/client.rb', line 237

def subscriptions
  connection.select_values("SELECT sub_name FROM pglogical.subscription").collect do |s|
    subscription_show_status(s)
  end
end

#tables_in_replication_set(set_name) ⇒ Array<String>

Lists the tables currently in the replication set

Parameters:

  • set_name (String)

    replication set name

Returns:

  • (Array<String>)

    names of the tables in the given set



317
318
319
320
321
322
323
324
325
# File 'lib/pg/pglogical/client.rb', line 317

def tables_in_replication_set(set_name)
  typed_exec(<<-SQL, set_name).values.flatten
    SELECT set_reloid
    FROM pglogical.replication_set_relation
    JOIN pglogical.replication_set
      USING (set_id)
    WHERE set_name = $1
  SQL
end

#wal_retained_bytesArray<Hash<String,String>>

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the provider node

Returns:

  • (Array<Hash<String,String>>)

    List of returned WAL bytes and replication slot names, one for each replication process



61
62
63
64
65
66
67
68
69
# File 'lib/pg/pglogical/client.rb', line 61

def wal_retained_bytes
  typed_exec(<<-SQL).to_a
    SELECT
      pg_xlog_location_diff(pg_current_xlog_insert_location(), restart_lsn) AS retained_bytes,
      slot_name
    FROM pg_replication_slots
    WHERE plugin = 'pglogical_output'
  SQL
end

#with_replication_set_lock(set_name) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
# File 'lib/pg/pglogical/client.rb', line 327

def with_replication_set_lock(set_name)
  connection.transaction(:requires_new => true) do
    typed_exec(<<-SQL, set_name)
      SELECT *
      FROM pglogical.replication_set
      WHERE set_name = $1
      FOR UPDATE
    SQL
    yield
  end
end