Class: PgSync::DataSource

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/pgsync/data_source.rb

Constant Summary

Constants included from Utils

Utils::COLOR_CODES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning

Constructor Details

#initialize(url) ⇒ DataSource

Returns a new instance of DataSource.



7
8
9
# File 'lib/pgsync/data_source.rb', line 7

def initialize(url)
  @url = url
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



5
6
7
# File 'lib/pgsync/data_source.rb', line 5

def url
  @url
end

Instance Method Details

#closeObject



130
131
132
133
134
135
# File 'lib/pgsync/data_source.rb', line 130

def close
  if @conn
    @conn.close
    @conn = nil
  end
end

#connObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pgsync/data_source.rb', line 113

def conn
  @conn ||= begin
    begin
      ENV["PGCONNECT_TIMEOUT"] ||= "3"
      if @url =~ /\Apostgres(ql)?:\/\//
        config = @url
      else
        config = {dbname: @url}
      end
      @concurrent_id = concurrent_id
      PG::Connection.new(config)
    rescue URI::InvalidURIError
      raise Error, "Invalid connection string. Make sure it works with `psql`"
    end
  end
end

#dbnameObject



27
28
29
# File 'lib/pgsync/data_source.rb', line 27

def dbname
  @dbname ||= conninfo[:dbname]
end

#execute(query, params = []) ⇒ Object



150
151
152
# File 'lib/pgsync/data_source.rb', line 150

def execute(query, params = [])
  conn.exec_params(query, params).to_a
end

#exists?Boolean

Returns:

  • (Boolean)


11
12
13
# File 'lib/pgsync/data_source.rb', line 11

def exists?
  @url && @url.size > 0
end

#hostObject



19
20
21
# File 'lib/pgsync/data_source.rb', line 19

def host
  @host ||= conninfo[:host]
end

#last_value(seq) ⇒ Object

this value comes from pg_get_serial_sequence which is already quoted



66
67
68
# File 'lib/pgsync/data_source.rb', line 66

def last_value(seq)
  execute("SELECT last_value FROM #{seq}").first["last_value"]
end

#local?Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/pgsync/data_source.rb', line 15

def local?
  !host || %w(localhost 127.0.0.1).include?(host)
end

#max_id(table, primary_key, sql_clause = nil) ⇒ Object



57
58
59
# File 'lib/pgsync/data_source.rb', line 57

def max_id(table, primary_key, sql_clause = nil)
  execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end

#min_id(table, primary_key, sql_clause = nil) ⇒ Object



61
62
63
# File 'lib/pgsync/data_source.rb', line 61

def min_id(table, primary_key, sql_clause = nil)
  execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end

#portObject



23
24
25
# File 'lib/pgsync/data_source.rb', line 23

def port
  @port ||= conninfo[:port]
end

#primary_key(table) ⇒ Object

stackoverflow.com/a/20537829 TODO can simplify with array_position in Postgres 9.5+



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/pgsync/data_source.rb', line 76

def primary_key(table)
  query = <<~SQL
    SELECT
      pg_attribute.attname,
      format_type(pg_attribute.atttypid, pg_attribute.atttypmod),
      pg_attribute.attnum,
      pg_index.indkey
    FROM
      pg_index, pg_class, pg_attribute, pg_namespace
    WHERE
      nspname = $1 AND
      relname = $2 AND
      indrelid = pg_class.oid AND
      pg_class.relnamespace = pg_namespace.oid AND
      pg_attribute.attrelid = pg_class.oid AND
      pg_attribute.attnum = any(pg_index.indkey) AND
      indisprimary
  SQL
  rows = execute(query, [table.schema, table.name])
  rows.sort_by { |r| r["indkey"].split(" ").index(r["attnum"]) }.map { |r| r["attname"] }
end

#reconnect_if_neededObject

reconnect for new thread or process



138
139
140
# File 'lib/pgsync/data_source.rb', line 138

def reconnect_if_needed
  reconnect if @concurrent_id != concurrent_id
end

#search_pathObject



142
143
144
# File 'lib/pgsync/data_source.rb', line 142

def search_path
  @search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end

#sequences(table, columns) ⇒ Object



53
54
55
# File 'lib/pgsync/data_source.rb', line 53

def sequences(table, columns)
  execute("SELECT #{columns.map { |f| "pg_get_serial_sequence(#{escape("#{quote_ident_full(table)}")}, #{escape(f)}) AS #{quote_ident(f)}" }.join(", ")}").first.values.compact
end

#server_version_numObject



146
147
148
# File 'lib/pgsync/data_source.rb', line 146

def server_version_num
  @server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end

#table_exists?(table) ⇒ Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/pgsync/data_source.rb', line 49

def table_exists?(table)
  table_set.include?(table)
end

#tablesObject

gets visible tables



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/pgsync/data_source.rb', line 32

def tables
  @tables ||= begin
    query = <<~SQL
      SELECT
        table_schema AS schema,
        table_name AS table
      FROM
        information_schema.tables
      WHERE
        table_type = 'BASE TABLE' AND
        table_schema NOT IN ('information_schema', 'pg_catalog')
      ORDER BY 1, 2
    SQL
    execute(query).map { |row| Table.new(row["schema"], row["table"]) }
  end
end

#transactionObject



154
155
156
157
158
159
160
161
162
163
# File 'lib/pgsync/data_source.rb', line 154

def transaction
  if conn.transaction_status == 0
    # not currently in transaction
    conn.transaction do
      yield
    end
  else
    yield
  end
end

#triggers(table) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/pgsync/data_source.rb', line 98

def triggers(table)
  query = <<~SQL
    SELECT
      tgname AS name,
      tgisinternal AS internal,
      tgenabled != 'D' AS enabled,
      tgconstraint != 0 AS integrity
    FROM
      pg_trigger
    WHERE
      pg_trigger.tgrelid = $1::regclass
  SQL
  execute(query, [quote_ident_full(table)])
end

#truncate(table) ⇒ Object



70
71
72
# File 'lib/pgsync/data_source.rb', line 70

def truncate(table)
  execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end