Class: PgEasyReplicate::Stats

Inherits:
Object
  • Object
show all
Extended by:
Helper
Defined in:
lib/pg_easy_replicate/stats.rb

Constant Summary collapse

REPLICATION_STATE_MAP =
{
  "i" => "initializing",
  "d" => "data_is_being_copied",
  "f" => "finished_table_copy",
  "s" => "synchronized",
  "r" => "replicating",
}.freeze

Class Method Summary collapse

Methods included from Helper

abort_with, connection_info, convert_to_array, db_name, db_user, determine_tables, internal_schema_name, internal_user_name, list_all_tables, logger, publication_name, quote_ident, restore_connections_on_source_db, secondary_source_db_url, source_db_url, subscription_name, target_db_url, test_env?, underscore, validate_table_lists

Class Method Details

.all_tables_replicating?(group_name) ⇒ Boolean

Returns:

  • (Boolean)


118
119
120
121
122
123
124
125
126
127
# File 'lib/pg_easy_replicate/stats.rb', line 118

def all_tables_replicating?(group_name)
  result =
    replication_stats(group_name)
      .each
      .with_object(Hash.new(0)) do |state, counts|
        counts[state[:replication_state]] += 1
      end
  result.keys.uniq.one? &&
    result.keys.first == REPLICATION_STATE_MAP["r"]
end

.follow(group_name) ⇒ Object



37
38
39
40
41
42
# File 'lib/pg_easy_replicate/stats.rb', line 37

def follow(group_name)
  loop do
    print(group_name)
    sleep(1)
  end
end

.lag_stats(group_name) ⇒ Object

Get



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pg_easy_replicate/stats.rb', line 69

def lag_stats(group_name)
  sql = <<~SQL
    SELECT pid,
    client_addr,
    usename as user_name,
    application_name,
    state,
    sync_state,
    pg_wal_lsn_diff(sent_lsn, write_lsn) AS write_lag,
    pg_wal_lsn_diff(sent_lsn, flush_lsn) AS flush_lag,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag
    FROM pg_stat_replication
    WHERE application_name = '#{subscription_name(group_name)}';
  SQL

  Query.run(query: sql, connection_url: source_db_url)
end

.message_lsn_receipts(group_name) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/pg_easy_replicate/stats.rb', line 137

def message_lsn_receipts(group_name)
  sql = <<~SQL
    select
    received_lsn,
    last_msg_send_time,
    last_msg_receipt_time,
    latest_end_lsn,
    latest_end_time
    from
      pg_catalog.pg_stat_subscription
    WHERE subname = '#{subscription_name(group_name)}'
  SQL
  Query.run(query: sql, connection_url: target_db_url)
end

.notify(group_name, url, frequency = 10, timeout = 10) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pg_easy_replicate/stats.rb', line 44

def notify(group_name, url, frequency = 10, timeout = 10)
  loop do
    stats = object(group_name)
    uri = URI.parse(url)

    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = (uri.scheme == "https")
    http.open_timeout = timeout
    http.read_timeout = timeout

    request = Net::HTTP::Post.new(uri.request_uri)
    request.content_type = "application/json"
    request.body = stats.to_json

    response = http.request(request)

    puts "Notification sent: #{response.code} #{response.message}"

    sleep(frequency)
  end
rescue StandardError => e
  abort_with("Notify failed with: #{e.message}")
end

.object(group_name) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pg_easy_replicate/stats.rb', line 17

def object(group_name)
  stats = replication_stats(group_name)
  group = Group.find(group_name)
  {
    lag_stats: lag_stats(group_name),
    replication_slots: pg_replication_slots(group_name),
    replication_stats: stats,
    replication_stats_count_by_state:
      replication_stats_count_by_state(stats),
    message_lsn_receipts: message_lsn_receipts(group_name),
    sync_started_at: group[:started_at],
    sync_failed_at: group[:failed_at],
    switchover_completed_at: group[:switchover_completed_at],
  }
end

.pg_replication_slots(group_name) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/pg_easy_replicate/stats.rb', line 87

def pg_replication_slots(group_name)
  sql = <<~SQL
   select * from pg_replication_slots WHERE slot_name = '#{subscription_name(group_name)}';
  SQL

  Query.run(query: sql, connection_url: source_db_url)
end


33
34
35
# File 'lib/pg_easy_replicate/stats.rb', line 33

def print(group_name)
  puts JSON.pretty_generate(object(group_name))
end

.replication_stats(group_name) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pg_easy_replicate/stats.rb', line 95

def replication_stats(group_name)
  sql = <<~SQL
    SELECT
    s.subname AS subscription_name,
    c.relnamespace :: regnamespace :: text as table_schema,
    c.relname as table_name,
    rel.srsubstate as replication_state
  FROM
    pg_catalog.pg_subscription s
    JOIN pg_catalog.pg_subscription_rel rel ON rel.srsubid = s.oid
    JOIN pg_catalog.pg_class c on c.oid = rel.srrelid
  WHERE s.subname = '#{subscription_name(group_name)}'
  SQL

  Query
    .run(query: sql, connection_url: target_db_url)
    .each do |obj|
      obj[:replication_state] = REPLICATION_STATE_MAP[
        obj[:replication_state]
      ]
    end
end

.replication_stats_count_by_state(stats) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/pg_easy_replicate/stats.rb', line 129

def replication_stats_count_by_state(stats)
  stats
    .each
    .with_object(Hash.new(0)) do |state, counts|
      counts[state[:replication_state]] += 1
    end
end