Class: PgSync::DataSource
- Inherits:
-
Object
- Object
- PgSync::DataSource
- Defined in:
- lib/pgsync/data_source.rb
Instance Attribute Summary collapse
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
- #close ⇒ Object
- #columns(table) ⇒ Object
- #conn ⇒ Object
- #dbname ⇒ Object
- #dump_command(tables) ⇒ Object
- #exists? ⇒ Boolean
- #fully_resolve_tables(tables) ⇒ Object
- #host ⇒ Object
-
#initialize(source, timeout: 3) ⇒ DataSource
constructor
A new instance of DataSource.
- #last_value(seq) ⇒ Object
- #local? ⇒ Boolean
- #max_id(table, primary_key, sql_clause = nil) ⇒ Object
- #min_id(table, primary_key, sql_clause = nil) ⇒ Object
- #port ⇒ Object
- #primary_key(table) ⇒ Object
- #restore_command ⇒ Object
- #search_path ⇒ Object
- #sequences(table, columns) ⇒ Object
- #table_exists?(table) ⇒ Boolean
-
#tables ⇒ Object
gets visible tables.
- #truncate(table) ⇒ Object
Constructor Details
#initialize(source, timeout: 3) ⇒ DataSource
Returns a new instance of DataSource.
5 6 7 8 |
# File 'lib/pgsync/data_source.rb', line 5 def initialize(source, timeout: 3) @url = resolve_url(source) @timeout = timeout end |
Instance Attribute Details
#url ⇒ Object (readonly)
Returns the value of attribute url.
3 4 5 |
# File 'lib/pgsync/data_source.rb', line 3 def url @url end |
Instance Method Details
#close ⇒ Object
104 105 106 107 108 109 |
# File 'lib/pgsync/data_source.rb', line 104 def close if @conn @conn.close @conn = nil end end |
#columns(table) ⇒ Object
42 43 44 45 |
# File 'lib/pgsync/data_source.rb', line 42 def columns(table) query = "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2" execute(query, table.split(".", 2)).map { |row| row["column_name"] } end |
#conn ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/pgsync/data_source.rb', line 88 def conn @conn ||= begin begin ENV["PGCONNECT_TIMEOUT"] ||= @timeout.to_s if @url =~ /\Apostgres(ql)?:\/\// config = @url else config = {dbname: @url} end PG::Connection.new(config) rescue URI::InvalidURIError raise Error, "Invalid connection string" end end end |
#dbname ⇒ Object
26 27 28 |
# File 'lib/pgsync/data_source.rb', line 26 def dbname @dbname ||= conninfo[:dbname] end |
#dump_command(tables) ⇒ Object
111 112 113 114 |
# File 'lib/pgsync/data_source.rb', line 111 def dump_command(tables) tables = tables ? tables.keys.map { |t| "-t #{Shellwords.escape(quote_ident_full(t))}" }.join(" ") : "" "pg_dump -Fc --verbose --schema-only --no-owner --no-acl #{tables} -d #{@url}" end |
#exists? ⇒ Boolean
10 11 12 |
# File 'lib/pgsync/data_source.rb', line 10 def exists? @url && @url.size > 0 end |
#fully_resolve_tables(tables) ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'lib/pgsync/data_source.rb', line 121 def fully_resolve_tables(tables) no_schema_tables = {} search_path_index = Hash[search_path.map.with_index.to_a] self.tables.group_by { |t| t.split(".", 2)[-1] }.each do |group, t2| no_schema_tables[group] = t2.sort_by { |t| [search_path_index[t.split(".", 2)[0]] || 1000000, t] }[0] end Hash[tables.map { |k, v| [no_schema_tables[k] || k, v] }] end |
#host ⇒ Object
18 19 20 |
# File 'lib/pgsync/data_source.rb', line 18 def host @host ||= conninfo[:host] end |
#last_value(seq) ⇒ Object
59 60 61 |
# File 'lib/pgsync/data_source.rb', line 59 def last_value(seq) execute("select last_value from #{seq}")[0]["last_value"] end |
#local? ⇒ Boolean
14 15 16 |
# File 'lib/pgsync/data_source.rb', line 14 def local? !host || %w(localhost 127.0.0.1).include?(host) end |
#max_id(table, primary_key, sql_clause = nil) ⇒ Object
51 52 53 |
# File 'lib/pgsync/data_source.rb', line 51 def max_id(table, primary_key, sql_clause = nil) execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}")[0]["max"].to_i end |
#min_id(table, primary_key, sql_clause = nil) ⇒ Object
55 56 57 |
# File 'lib/pgsync/data_source.rb', line 55 def min_id(table, primary_key, sql_clause = nil) execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}")[0]["min"].to_i end |
#port ⇒ Object
22 23 24 |
# File 'lib/pgsync/data_source.rb', line 22 def port @port ||= conninfo[:port] end |
#primary_key(table) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/pgsync/data_source.rb', line 68 def primary_key(table) query = " SELECT\n pg_attribute.attname,\n format_type(pg_attribute.atttypid, pg_attribute.atttypmod)\n FROM\n pg_index, pg_class, pg_attribute, pg_namespace\n WHERE\n pg_class.oid = $2::regclass AND\n indrelid = pg_class.oid AND\n nspname = $1 AND\n pg_class.relnamespace = pg_namespace.oid AND\n pg_attribute.attrelid = pg_class.oid AND\n pg_attribute.attnum = any(pg_index.indkey) AND\n indisprimary\n SQL\n row = execute(query, [table.split(\".\", 2)[0], quote_ident_full(table)])[0]\n row && row[\"attname\"]\nend\n" |
#restore_command ⇒ Object
116 117 118 119 |
# File 'lib/pgsync/data_source.rb', line 116 def restore_command if_exists = Gem::Version.new(pg_restore_version) >= Gem::Version.new("9.4.0") "pg_restore --verbose --no-owner --no-acl --clean #{if_exists ? "--if-exists" : nil} -d #{@url}" end |
#search_path ⇒ Object
131 132 133 |
# File 'lib/pgsync/data_source.rb', line 131 def search_path @search_path ||= execute("SELECT current_schemas(true)")[0]["current_schemas"][1..-2].split(",") end |
#sequences(table, columns) ⇒ Object
47 48 49 |
# File 'lib/pgsync/data_source.rb', line 47 def sequences(table, columns) execute("SELECT #{columns.map { |f| "pg_get_serial_sequence(#{escape("#{quote_ident_full(table)}")}, #{escape(f)}) AS #{quote_ident(f)}" }.join(", ")}")[0].values.compact end |
#table_exists?(table) ⇒ Boolean
38 39 40 |
# File 'lib/pgsync/data_source.rb', line 38 def table_exists?(table) table_set.include?(table) end |
#tables ⇒ Object
gets visible tables
31 32 33 34 35 36 |
# File 'lib/pgsync/data_source.rb', line 31 def tables @tables ||= begin query = "SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('information_schema', 'pg_catalog') ORDER BY 1, 2" execute(query).map { |row| "#{row["table_schema"]}.#{row["table_name"]}" } end end |
#truncate(table) ⇒ Object
63 64 65 |
# File 'lib/pgsync/data_source.rb', line 63 def truncate(table) execute("TRUNCATE #{quote_ident_full(table)} CASCADE") end |