Class: PgSync::DataSource

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/pgsync/data_source.rb

Constant Summary

Constants included from Utils

Utils::COLOR_CODES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #monotonic_time, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning

Constructor Details

#initialize(url, name:, debug:) ⇒ DataSource

Returns a new instance of DataSource.



7
8
9
10
11
# File 'lib/pgsync/data_source.rb', line 7

def initialize(url, name:, debug:)
  @url = url
  @name = name
  @debug = debug
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



5
6
7
# File 'lib/pgsync/data_source.rb', line 5

def url
  @url
end

Instance Method Details

#closeObject



125
126
127
128
129
130
# File 'lib/pgsync/data_source.rb', line 125

def close
  if @conn
    @conn.close
    @conn = nil
  end
end

#connObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/pgsync/data_source.rb', line 108

def conn
  @conn ||= begin
    begin
      ENV["PGCONNECT_TIMEOUT"] ||= "3"
      if @url.start_with?("postgres://", "postgresql://")
        config = @url
      else
        config = {dbname: @url}
      end
      @concurrent_id = concurrent_id
      PG::Connection.new(config)
    rescue URI::InvalidURIError
      raise Error, "Invalid connection string. Make sure it works with `psql`"
    end
  end
end

#create_schema(schema) ⇒ Object



89
90
91
# File 'lib/pgsync/data_source.rb', line 89

def create_schema(schema)
  execute("CREATE SCHEMA #{quote_ident(schema)}")
end

#dbnameObject



34
35
36
# File 'lib/pgsync/data_source.rb', line 34

def dbname
  @dbname ||= conninfo[:dbname]
end

#execute(query, params = []) ⇒ Object



145
146
147
148
# File 'lib/pgsync/data_source.rb', line 145

def execute(query, params = [])
  log_sql query, params
  conn.exec_params(query, params).to_a
end

#exists?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/pgsync/data_source.rb', line 13

def exists?
  @url && @url.size > 0
end

#hostObject



26
27
28
# File 'lib/pgsync/data_source.rb', line 26

def host
  @host ||= dedup_localhost(conninfo[:host])
end

#last_value(seq) ⇒ Object



68
69
70
# File 'lib/pgsync/data_source.rb', line 68

def last_value(seq)
  execute("SELECT last_value FROM #{quote_ident_full(seq)}").first["last_value"]
end

#local?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/pgsync/data_source.rb', line 17

def local?
  socket? || %w(localhost 127.0.0.1).include?(host)
end

#log_sql(query, params = {}) ⇒ Object

TODO log time for each statement



166
167
168
169
170
171
172
# File 'lib/pgsync/data_source.rb', line 166

def log_sql(query, params = {})
  if @debug
    message = "#{colorize("[#{@name}]", :cyan)} #{query.gsub(/\s+/, " ").strip}"
    message = "#{message} #{params.inspect}" if params.any?
    log message
  end
end

#max_id(table, primary_key, sql_clause = nil) ⇒ Object



60
61
62
# File 'lib/pgsync/data_source.rb', line 60

def max_id(table, primary_key, sql_clause = nil)
  execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end

#min_id(table, primary_key, sql_clause = nil) ⇒ Object



64
65
66
# File 'lib/pgsync/data_source.rb', line 64

def min_id(table, primary_key, sql_clause = nil)
  execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end

#portObject



30
31
32
# File 'lib/pgsync/data_source.rb', line 30

def port
  @port ||= dedup_localhost(conninfo[:port])
end

#reconnect_if_neededObject

reconnect for new thread or process



133
134
135
# File 'lib/pgsync/data_source.rb', line 133

def reconnect_if_needed
  reconnect if @concurrent_id != concurrent_id
end

#schemasObject



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pgsync/data_source.rb', line 76

def schemas
  @schemas ||= begin
    query = <<~SQL
      SELECT
        schema_name
      FROM
        information_schema.schemata
      ORDER BY 1
    SQL
    execute(query).map { |row| row["schema_name"] }
  end
end

#search_pathObject



137
138
139
# File 'lib/pgsync/data_source.rb', line 137

def search_path
  @search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end

#server_version_numObject



141
142
143
# File 'lib/pgsync/data_source.rb', line 141

def server_version_num
  @server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end

#socket?Boolean

host can be “/var/run/postgresql,/run/postgresql,/tmp” on Linux with pg 1.6+

Returns:

  • (Boolean)


22
23
24
# File 'lib/pgsync/data_source.rb', line 22

def socket?
  !host || host.split(",").all? { |v| v.start_with?("/") }
end

#table_exists?(table) ⇒ Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/pgsync/data_source.rb', line 56

def table_exists?(table)
  table_set.include?(table)
end

#tablesObject

gets visible tables



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pgsync/data_source.rb', line 39

def tables
  @tables ||= begin
    query = <<~SQL
      SELECT
        table_schema AS schema,
        table_name AS table
      FROM
        information_schema.tables
      WHERE
        table_type = 'BASE TABLE' AND
        table_schema NOT IN ('information_schema', 'pg_catalog')
      ORDER BY 1, 2
    SQL
    execute(query).map { |row| Table.new(row["schema"], row["table"]) }
  end
end

#transactionObject



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/pgsync/data_source.rb', line 150

def transaction
  if conn.transaction_status == 0
    # not currently in transaction
    log_sql "BEGIN"
    result =
      conn.transaction do
        yield
      end
    log_sql "COMMIT"
    result
  else
    yield
  end
end

#triggers(table) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/pgsync/data_source.rb', line 93

def triggers(table)
  query = <<~SQL
    SELECT
      tgname AS name,
      tgisinternal AS internal,
      tgenabled != 'D' AS enabled,
      tgconstraint != 0 AS integrity
    FROM
      pg_trigger
    WHERE
      pg_trigger.tgrelid = $1::regclass
  SQL
  execute(query, [quote_ident_full(table)])
end

#truncate(table) ⇒ Object



72
73
74
# File 'lib/pgsync/data_source.rb', line 72

def truncate(table)
  execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end