Class: PgSync::DataSource
- Inherits:
-
Object
- Object
- PgSync::DataSource
show all
- Includes:
- Utils
- Defined in:
- lib/pgsync/data_source.rb
Constant Summary
Constants included
from Utils
Utils::COLOR_CODES
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Utils
#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning
Constructor Details
Returns a new instance of DataSource.
7
8
9
|
# File 'lib/pgsync/data_source.rb', line 7
def initialize(url)
@url = url
end
|
Instance Attribute Details
#url ⇒ Object
Returns the value of attribute url.
5
6
7
|
# File 'lib/pgsync/data_source.rb', line 5
def url
@url
end
|
Instance Method Details
#close ⇒ Object
130
131
132
133
134
135
|
# File 'lib/pgsync/data_source.rb', line 130
def close
if @conn
@conn.close
@conn = nil
end
end
|
#conn ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'lib/pgsync/data_source.rb', line 113
def conn
@conn ||= begin
begin
ENV["PGCONNECT_TIMEOUT"] ||= "3"
if @url =~ /\Apostgres(ql)?:\/\//
config = @url
else
config = {dbname: @url}
end
@concurrent_id = concurrent_id
PG::Connection.new(config)
rescue URI::InvalidURIError
raise Error, "Invalid connection string. Make sure it works with `psql`"
end
end
end
|
#dbname ⇒ Object
27
28
29
|
# File 'lib/pgsync/data_source.rb', line 27
def dbname
@dbname ||= conninfo[:dbname]
end
|
#execute(query, params = []) ⇒ Object
150
151
152
|
# File 'lib/pgsync/data_source.rb', line 150
def execute(query, params = [])
conn.exec_params(query, params).to_a
end
|
#exists? ⇒ Boolean
11
12
13
|
# File 'lib/pgsync/data_source.rb', line 11
def exists?
@url && @url.size > 0
end
|
#host ⇒ Object
19
20
21
|
# File 'lib/pgsync/data_source.rb', line 19
def host
@host ||= conninfo[:host]
end
|
#last_value(seq) ⇒ Object
this value comes from pg_get_serial_sequence which is already quoted
66
67
68
|
# File 'lib/pgsync/data_source.rb', line 66
def last_value(seq)
execute("SELECT last_value FROM #{seq}").first["last_value"]
end
|
#local? ⇒ Boolean
15
16
17
|
# File 'lib/pgsync/data_source.rb', line 15
def local?
!host || %w(localhost 127.0.0.1).include?(host)
end
|
#max_id(table, primary_key, sql_clause = nil) ⇒ Object
57
58
59
|
# File 'lib/pgsync/data_source.rb', line 57
def max_id(table, primary_key, sql_clause = nil)
execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end
|
#min_id(table, primary_key, sql_clause = nil) ⇒ Object
61
62
63
|
# File 'lib/pgsync/data_source.rb', line 61
def min_id(table, primary_key, sql_clause = nil)
execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end
|
#port ⇒ Object
23
24
25
|
# File 'lib/pgsync/data_source.rb', line 23
def port
@port ||= conninfo[:port]
end
|
#primary_key(table) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/pgsync/data_source.rb', line 76
def primary_key(table)
query = <<~SQL
SELECT
pg_attribute.attname,
format_type(pg_attribute.atttypid, pg_attribute.atttypmod),
pg_attribute.attnum,
pg_index.indkey
FROM
pg_index, pg_class, pg_attribute, pg_namespace
WHERE
nspname = $1 AND
relname = $2 AND
indrelid = pg_class.oid AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
indisprimary
SQL
rows = execute(query, [table.schema, table.name])
rows.sort_by { |r| r["indkey"].split(" ").index(r["attnum"]) }.map { |r| r["attname"] }
end
|
#reconnect_if_needed ⇒ Object
reconnect for new thread or process
138
139
140
|
# File 'lib/pgsync/data_source.rb', line 138
def reconnect_if_needed
reconnect if @concurrent_id != concurrent_id
end
|
#search_path ⇒ Object
142
143
144
|
# File 'lib/pgsync/data_source.rb', line 142
def search_path
@search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end
|
#sequences(table, columns) ⇒ Object
53
54
55
|
# File 'lib/pgsync/data_source.rb', line 53
def sequences(table, columns)
execute("SELECT #{columns.map { |f| "pg_get_serial_sequence(#{escape("#{quote_ident_full(table)}")}, #{escape(f)}) AS #{quote_ident(f)}" }.join(", ")}").first.values.compact
end
|
#server_version_num ⇒ Object
146
147
148
|
# File 'lib/pgsync/data_source.rb', line 146
def server_version_num
@server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end
|
#table_exists?(table) ⇒ Boolean
49
50
51
|
# File 'lib/pgsync/data_source.rb', line 49
def table_exists?(table)
table_set.include?(table)
end
|
#tables ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/pgsync/data_source.rb', line 32
def tables
@tables ||= begin
query = <<~SQL
SELECT
table_schema AS schema,
table_name AS table
FROM
information_schema.tables
WHERE
table_type = 'BASE TABLE' AND
table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY 1, 2
SQL
execute(query).map { |row| Table.new(row["schema"], row["table"]) }
end
end
|
#transaction ⇒ Object
154
155
156
157
158
159
160
161
162
163
|
# File 'lib/pgsync/data_source.rb', line 154
def transaction
if conn.transaction_status == 0
conn.transaction do
yield
end
else
yield
end
end
|
#triggers(table) ⇒ Object
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/pgsync/data_source.rb', line 98
def triggers(table)
query = <<~SQL
SELECT
tgname AS name,
tgisinternal AS internal,
tgenabled != 'D' AS enabled,
tgconstraint != 0 AS integrity
FROM
pg_trigger
WHERE
pg_trigger.tgrelid = $1::regclass
SQL
execute(query, [quote_ident_full(table)])
end
|
#truncate(table) ⇒ Object
70
71
72
|
# File 'lib/pgsync/data_source.rb', line 70
def truncate(table)
execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end
|