Class: PgSync::DataSource
- Inherits:
-
Object
- Object
- PgSync::DataSource
- Defined in:
- lib/pgsync/data_source.rb
Instance Attribute Summary collapse
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
- #close ⇒ Object
- #columns(table) ⇒ Object
-
#conn ⇒ Object
borrowed from ActiveRecord::ConnectionAdapters::ConnectionSpecification::ConnectionUrlResolver.
- #dump_command(tables) ⇒ Object
- #exists? ⇒ Boolean
- #fully_resolve_tables(tables) ⇒ Object
-
#initialize(source) ⇒ DataSource
constructor
A new instance of DataSource.
- #last_value(seq) ⇒ Object
- #local? ⇒ Boolean
- #max_id(table, primary_key, sql_clause = nil) ⇒ Object
- #min_id(table, primary_key, sql_clause = nil) ⇒ Object
- #primary_key(table) ⇒ Object
- #restore_command ⇒ Object
- #schema ⇒ Object
- #search_path ⇒ Object
- #sequences(table, columns) ⇒ Object
- #table_exists?(table) ⇒ Boolean
- #tables ⇒ Object
- #to_url ⇒ Object
- #truncate(table) ⇒ Object
- #uri ⇒ Object
Constructor Details
#initialize(source) ⇒ DataSource
5 6 7 |
# File 'lib/pgsync/data_source.rb', line 5 def initialize(source) @url = resolve_url(source) end |
Instance Attribute Details
#url ⇒ Object (readonly)
Returns the value of attribute url.
3 4 5 |
# File 'lib/pgsync/data_source.rb', line 3 def url @url end |
Instance Method Details
#close ⇒ Object
42 43 44 45 46 47 |
# File 'lib/pgsync/data_source.rb', line 42 def close if @conn conn.close @conn = nil end end |
#columns(table) ⇒ Object
55 56 57 58 |
# File 'lib/pgsync/data_source.rb', line 55 def columns(table) query = "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2" execute(query, table.split(".", 2)).map { |row| row["column_name"] } end |
#conn ⇒ Object
borrowed from ActiveRecord::ConnectionAdapters::ConnectionSpecification::ConnectionUrlResolver
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/pgsync/data_source.rb', line 103 def conn @conn ||= begin begin uri_parser = URI::Parser.new config = { host: uri.host, port: uri.port, dbname: uri.path.sub(/\A\//, ""), user: uri.user, password: uri.password, connect_timeout: 3 }.reject { |_, value| value.to_s.empty? } config.map { |key, value| config[key] = uri_parser.unescape(value) if value.is_a?(String) } conn = PG::Connection.new(config) rescue PG::ConnectionBad => e log raise PgSync::Error, e. end end end |
#dump_command(tables) ⇒ Object
124 125 126 127 |
# File 'lib/pgsync/data_source.rb', line 124 def dump_command(tables) tables = tables.keys.map { |t| "-t #{Shellwords.escape(quote_ident_full(t))}" }.join(" ") dump_command = "pg_dump -Fc --verbose --schema-only --no-owner --no-acl #{tables} #{to_url}" end |
#exists? ⇒ Boolean
9 10 11 |
# File 'lib/pgsync/data_source.rb', line 9 def exists? @url && @url.size > 0 end |
#fully_resolve_tables(tables) ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/pgsync/data_source.rb', line 135 def fully_resolve_tables(tables) no_schema_tables = {} search_path_index = Hash[search_path.map.with_index.to_a] self.tables.group_by { |t| t.split(".", 2)[-1] }.each do |group, t2| no_schema_tables[group] = t2.sort_by { |t| [search_path_index[t.split(".", 2)[0]] || 1000000, t] }[0] end Hash[tables.map { |k, v| [no_schema_tables[k] || k, v] }] end |
#last_value(seq) ⇒ Object
72 73 74 |
# File 'lib/pgsync/data_source.rb', line 72 def last_value(seq) execute("select last_value from #{seq}")[0]["last_value"] end |
#local? ⇒ Boolean
13 14 15 |
# File 'lib/pgsync/data_source.rb', line 13 def local? %w(localhost 127.0.0.1).include?(uri.host) end |
#max_id(table, primary_key, sql_clause = nil) ⇒ Object
64 65 66 |
# File 'lib/pgsync/data_source.rb', line 64 def max_id(table, primary_key, sql_clause = nil) execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}")[0]["max"].to_i end |
#min_id(table, primary_key, sql_clause = nil) ⇒ Object
68 69 70 |
# File 'lib/pgsync/data_source.rb', line 68 def min_id(table, primary_key, sql_clause = nil) execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}")[0]["min"].to_i end |
#primary_key(table) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/pgsync/data_source.rb', line 81 def primary_key(table) query = " SELECT\n pg_attribute.attname,\n format_type(pg_attribute.atttypid, pg_attribute.atttypmod)\n FROM\n pg_index, pg_class, pg_attribute, pg_namespace\n WHERE\n pg_class.oid = $2::regclass AND\n indrelid = pg_class.oid AND\n nspname = $1 AND\n pg_class.relnamespace = pg_namespace.oid AND\n pg_attribute.attrelid = pg_class.oid AND\n pg_attribute.attnum = any(pg_index.indkey) AND\n indisprimary\n SQL\n row = execute(query, [table.split(\".\", 2)[0], quote_ident_full(table)])[0]\n row && row[\"attname\"]\nend\n" |
#restore_command ⇒ Object
129 130 131 132 133 |
# File 'lib/pgsync/data_source.rb', line 129 def restore_command psql_version = Gem::Version.new(`psql --version`.lines[0].chomp.split(" ")[-1].sub(/beta\d/, "")) if_exists = psql_version >= Gem::Version.new("9.4.0") restore_command = "pg_restore --verbose --no-owner --no-acl --clean #{if_exists ? "--if-exists" : nil} -d #{to_url}" end |
#schema ⇒ Object
28 29 30 |
# File 'lib/pgsync/data_source.rb', line 28 def schema @schema ||= CGI.parse(uri.query.to_s)["schema"][0] end |
#search_path ⇒ Object
145 146 147 |
# File 'lib/pgsync/data_source.rb', line 145 def search_path execute("SELECT current_schemas(true)")[0]["current_schemas"][1..-2].split(",") end |
#sequences(table, columns) ⇒ Object
60 61 62 |
# File 'lib/pgsync/data_source.rb', line 60 def sequences(table, columns) execute("SELECT #{columns.map { |f| "pg_get_serial_sequence(#{escape("#{quote_ident_full(table)}")}, #{escape(f)}) AS #{f}" }.join(", ")}")[0].values.compact end |
#table_exists?(table) ⇒ Boolean
37 38 39 40 |
# File 'lib/pgsync/data_source.rb', line 37 def table_exists?(table) query = "SELECT 1 FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2" execute(query, table.split(".", 2)).size > 0 end |
#tables ⇒ Object
32 33 34 35 |
# File 'lib/pgsync/data_source.rb', line 32 def tables query = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog') ORDER BY 1, 2" execute(query).map { |row| "#{row["schemaname"]}.#{row["tablename"]}" } end |
#to_url ⇒ Object
49 50 51 52 53 |
# File 'lib/pgsync/data_source.rb', line 49 def to_url uri = self.uri.dup uri.query = nil uri.to_s end |
#truncate(table) ⇒ Object
76 77 78 |
# File 'lib/pgsync/data_source.rb', line 76 def truncate(table) execute("TRUNCATE #{quote_ident_full(table)} CASCADE") end |
#uri ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pgsync/data_source.rb', line 17 def uri @uri ||= begin uri = URI.parse(@url) uri.scheme ||= "postgres" uri.host ||= "localhost" uri.port ||= 5432 uri.path = "/#{uri.path}" if uri.path && uri.path[0] != "/" uri end end |