Class: PgEasyReplicate::Query
- Inherits:
-
Object
- Object
- PgEasyReplicate::Query
show all
- Extended by:
- Helper
- Defined in:
- lib/pg_easy_replicate/query.rb
Class Method Summary
collapse
Methods included from Helper
abort_with, connection_info, db_name, db_user, determine_tables, internal_schema_name, internal_user_name, list_all_tables, logger, publication_name, quote_ident, secondary_source_db_url, source_db_url, subscription_name, target_db_url, test_env?, underscore
Class Method Details
.connect(connection_url:, user: internal_user_name, schema: nil) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/pg_easy_replicate/query.rb', line 42
def connect(connection_url:, user: internal_user_name, schema: nil)
c =
Sequel.connect(
connection_url,
user: user,
logger: ENV.fetch("DEBUG", nil) ? logger : nil,
search_path: schema,
)
logger.debug("Connection established")
c
end
|
.run(query:, connection_url:, user: internal_user_name, schema: nil, transaction: true, using_vacuum_analyze: false) ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/pg_easy_replicate/query.rb', line 8
def run(
query:,
connection_url:,
user: internal_user_name,
schema: nil,
transaction: true,
using_vacuum_analyze: false
)
conn =
connect(connection_url: connection_url, schema: schema, user: user)
timeout ||= ENV["PG_EASY_REPLICATE_STATEMENT_TIMEOUT"] || "5s"
if transaction
r =
conn.transaction do
conn.run("SET search_path to #{quote_ident(schema)}") if schema
conn.run("SET statement_timeout to '#{timeout}'")
conn.fetch(query).to_a
end
else
conn.run("SET search_path to #{quote_ident(schema)}") if schema
if using_vacuum_analyze
conn.run("SET statement_timeout=0")
else
conn.run("SET statement_timeout to '5s'")
end
r = conn.fetch(query).to_a
end
conn.disconnect
r
ensure
conn&.fetch("RESET statement_timeout")
conn&.disconnect
end
|