Class: PgEasyReplicate::Stats

Inherits:
Object
  • Object
show all
Extended by:
Helper
Defined in:
lib/pg_easy_replicate/stats.rb

Constant Summary collapse

REPLICATION_STATE_MAP =
{
  "i" => "initializing",
  "d" => "data_is_being_copied",
  "f" => "finished_table_copy",
  "s" => "synchronized",
  "r" => "replicating",
}.freeze

Class Method Summary collapse

Methods included from Helper

abort_with, connection_info, db_name, db_user, determine_tables, internal_schema_name, internal_user_name, list_all_tables, logger, publication_name, quote_ident, secondary_source_db_url, source_db_url, subscription_name, target_db_url, test_env?, underscore

Class Method Details

.all_tables_replicating?(group_name) ⇒ Boolean

Returns:

  • (Boolean)


92
93
94
95
96
97
98
99
100
101
# File 'lib/pg_easy_replicate/stats.rb', line 92

def all_tables_replicating?(group_name)
  result =
    replication_stats(group_name)
      .each
      .with_object(Hash.new(0)) do |state, counts|
        counts[state[:replication_state]] += 1
      end
  result.keys.uniq.count == 1 &&
    result.keys.first == REPLICATION_STATE_MAP["r"]
end

.follow(group_name) ⇒ Object



35
36
37
38
39
40
# File 'lib/pg_easy_replicate/stats.rb', line 35

def follow(group_name)
  loop do
    print(group_name)
    sleep(1)
  end
end

.lag_stats(group_name) ⇒ Object

Get



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/pg_easy_replicate/stats.rb', line 43

def lag_stats(group_name)
  sql = <<~SQL
    SELECT pid,
    client_addr,
    usename as user_name,
    application_name,
    state,
    sync_state,
    pg_wal_lsn_diff(sent_lsn, write_lsn) AS write_lag,
    pg_wal_lsn_diff(sent_lsn, flush_lsn) AS flush_lag,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag
    FROM pg_stat_replication
    WHERE application_name = '#{subscription_name(group_name)}';
  SQL

  Query.run(query: sql, connection_url: source_db_url)
end

.message_lsn_receipts(group_name) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/pg_easy_replicate/stats.rb', line 111

def message_lsn_receipts(group_name)
  sql = <<~SQL
    select
    received_lsn,
    last_msg_send_time,
    last_msg_receipt_time,
    latest_end_lsn,
    latest_end_time
    from
      pg_catalog.pg_stat_subscription
    WHERE subname = '#{subscription_name(group_name)}'
  SQL
  Query.run(query: sql, connection_url: target_db_url)
end

.object(group_name) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/pg_easy_replicate/stats.rb', line 15

def object(group_name)
  stats = replication_stats(group_name)
  group = Group.find(group_name)
  {
    lag_stats: lag_stats(group_name),
    replication_slots: pg_replication_slots(group_name),
    replication_stats: stats,
    replication_stats_count_by_state:
      replication_stats_count_by_state(stats),
    message_lsn_receipts: message_lsn_receipts(group_name),
    sync_started_at: group[:started_at],
    sync_failed_at: group[:failed_at],
    switchover_completed_at: group[:switchover_completed_at],
  }
end

.pg_replication_slots(group_name) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/pg_easy_replicate/stats.rb', line 61

def pg_replication_slots(group_name)
  sql = <<~SQL
   select * from pg_replication_slots WHERE slot_name = '#{subscription_name(group_name)}';
  SQL

  Query.run(query: sql, connection_url: source_db_url)
end


31
32
33
# File 'lib/pg_easy_replicate/stats.rb', line 31

def print(group_name)
  puts JSON.pretty_generate(object(group_name))
end

.replication_stats(group_name) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pg_easy_replicate/stats.rb', line 69

def replication_stats(group_name)
  sql = <<~SQL
    SELECT
    s.subname AS subscription_name,
    c.relnamespace :: regnamespace :: text as table_schema,
    c.relname as table_name,
    rel.srsubstate as replication_state
  FROM
    pg_catalog.pg_subscription s
    JOIN pg_catalog.pg_subscription_rel rel ON rel.srsubid = s.oid
    JOIN pg_catalog.pg_class c on c.oid = rel.srrelid
  WHERE s.subname = '#{subscription_name(group_name)}'
  SQL

  Query
    .run(query: sql, connection_url: target_db_url)
    .each do |obj|
      obj[:replication_state] = REPLICATION_STATE_MAP[
        obj[:replication_state]
      ]
    end
end

.replication_stats_count_by_state(stats) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/pg_easy_replicate/stats.rb', line 103

def replication_stats_count_by_state(stats)
  stats
    .each
    .with_object(Hash.new(0)) do |state, counts|
      counts[state[:replication_state]] += 1
    end
end