Class: PgEasyReplicate::Query

Inherits:
Object
  • Object
show all
Extended by:
Helper
Defined in:
lib/pg_easy_replicate/query.rb

Class Method Summary collapse

Methods included from Helper

abort_with, connection_info, db_name, db_user, determine_tables, internal_schema_name, internal_user_name, list_all_tables, logger, publication_name, quote_ident, secondary_source_db_url, source_db_url, subscription_name, target_db_url, test_env?, underscore

Class Method Details

.connect(connection_url:, user: internal_user_name, schema: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/pg_easy_replicate/query.rb', line 42

def connect(connection_url:, user: internal_user_name, schema: nil)
  c =
    Sequel.connect(
      connection_url,
      user: user,
      logger: ENV.fetch("DEBUG", nil) ? logger : nil,
      search_path: schema,
    )
  logger.debug("Connection established")
  c
end

.run(query:, connection_url:, user: internal_user_name, schema: nil, transaction: true, using_vacuum_analyze: false) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pg_easy_replicate/query.rb', line 8

def run(
  query:,
  connection_url:,
  user: internal_user_name,
  schema: nil,
  transaction: true,
  using_vacuum_analyze: false
)
  conn =
    connect(connection_url: connection_url, schema: schema, user: user)
  timeout ||= ENV["PG_EASY_REPLICATE_STATEMENT_TIMEOUT"] || "5s"
  if transaction
    r =
      conn.transaction do
        conn.run("SET search_path to #{quote_ident(schema)}") if schema
        conn.run("SET statement_timeout to '#{timeout}'")
        conn.fetch(query).to_a
      end
  else
    conn.run("SET search_path to #{quote_ident(schema)}") if schema
    if using_vacuum_analyze
      conn.run("SET statement_timeout=0")
    else
      conn.run("SET statement_timeout to '5s'")
    end
    r = conn.fetch(query).to_a
  end
  conn.disconnect
  r
ensure
  conn&.fetch("RESET statement_timeout")
  conn&.disconnect
end