Class: Webhookdb::DBAdapter::PG

Inherits:
Webhookdb::DBAdapter show all
Includes:
ColumnTypes, DefaultSql
Defined in:
lib/webhookdb/db_adapter/pg.rb

Constant Summary collapse

VERIFY_TIMEOUT =
2
VERIFY_STATEMENT =
"SELECT 1"
COLTYPE_MAP =
{
  BIGINT => "bigint",
  BIGINT_ARRAY => "bigint[]",
  BOOLEAN => "boolean",
  DATE => "date",
  DECIMAL => "numeric",
  DOUBLE => "double precision",
  FLOAT => "float",
  INTEGER => "integer",
  INTEGER_ARRAY => "integer[]",
  OBJECT => "jsonb",
  TEXT => "text",
  TEXT_ARRAY => "text[]",
  TIMESTAMP => "timestamptz",
  UUID => "uuid",
}.freeze

Constants included from DefaultSql

DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS

Constants included from ColumnTypes

ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID

Constants inherited from Webhookdb::DBAdapter

INVALID_IDENTIFIER_MESSAGE, VALID_IDENTIFIER

Instance Attribute Summary

Attributes inherited from Webhookdb::DBAdapter

#name, #table, #targets, #unique

Instance Method Summary collapse

Methods included from DefaultSql

#assign_columns_sql, #create_schema_sql, #create_table_sql, #escape_identifier, #qualify_table

Methods inherited from Webhookdb::DBAdapter

adapter, #connection, #create_schema_sql, #create_table_sql, supported_adapters_message

Instance Method Details

#add_column_sql(table, column, if_not_exists: false) ⇒ Object



45
46
47
48
49
# File 'lib/webhookdb/db_adapter/pg.rb', line 45

def add_column_sql(table, column, if_not_exists: false)
  c = self.column_create_sql(column)
  ifne = if_not_exists ? " IF NOT EXISTS" : ""
  return "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN#{ifne} #{c}"
end

#column_create_sql(column) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/webhookdb/db_adapter/pg.rb', line 29

def column_create_sql(column)
  modifiers = +""
  coltype = COLTYPE_MAP.fetch(column.type)
  if column.pk?
    coltype = "bigserial" if column.type == BIGINT
    coltype = "serial" if column.type == INTEGER
    modifiers << " PRIMARY KEY"
  elsif column.unique?
    modifiers << " UNIQUE NOT NULL"
  elsif !column.nullable?
    modifiers << " NOT NULL"
  end
  colname = self.escape_identifier(column.name)
  return "#{colname} #{coltype}#{modifiers}"
end

#create_index_sql(index, concurrently:) ⇒ Object



18
19
20
21
22
23
24
25
26
27
# File 'lib/webhookdb/db_adapter/pg.rb', line 18

def create_index_sql(index, concurrently:)
  tgts = index.targets.map { |c| self.escape_identifier(c.name) }.join(", ")
  uniq = index.unique ? " UNIQUE" : ""
  concurrent = concurrently ? " CONCURRENTLY" : ""
  idxname = self.escape_identifier(index.name)
  tblname = self.qualify_table(index.table)
  where = ""
  where = " " + Webhookdb::Customer.where(index.where).sql.delete_prefix('SELECT * FROM "customers" ') if index.where
  return "CREATE#{uniq} INDEX#{concurrent} IF NOT EXISTS #{idxname} ON #{tblname} (#{tgts})#{where}"
end

#identifier_quote_charObject



14
15
16
# File 'lib/webhookdb/db_adapter/pg.rb', line 14

def identifier_quote_char
  return '"'
end

#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/webhookdb/db_adapter/pg.rb', line 51

def merge_from_csv(connection, file, table, pk_col, copy_columns)
  qtable = self.qualify_table(table)
  temptable = "#{self.escape_identifier(table.name)}_staging_#{SecureRandom.hex(4)}"
  connection.using do |db|
    db << "CREATE TEMP TABLE #{temptable} (LIKE #{qtable})"
    db.copy_into(temptable.to_sym, options: "DELIMITER ',', HEADER true, FORMAT csv", data: file)
    pkname = self.escape_identifier(pk_col.name)
    col_assigns = self.assign_columns_sql("src", nil, copy_columns)
    upsert_sql = [
      <<~UPDATE,
        UPDATE #{qtable} AS tgt
        SET #{col_assigns} FROM
        (SELECT * FROM #{temptable} WHERE #{pkname} IN (SELECT #{pkname} FROM #{qtable})) src
        WHERE tgt.#{pkname} = src.#{pkname};
      UPDATE
      "INSERT INTO #{qtable} SELECT * FROM #{temptable} WHERE #{pkname} NOT IN (SELECT #{pkname} FROM #{qtable});",
    ]
    db << upsert_sql.join("\n")
  end
end

#verify_connection(url, timeout: 2, statement: "SELECT 1") ⇒ Object



72
73
74
75
76
77
78
# File 'lib/webhookdb/db_adapter/pg.rb', line 72

def verify_connection(url, timeout: 2, statement: "SELECT 1")
  conn = self.connection(url)
  conn.using(connect_timeout: timeout) do |c|
    c.execute("SET statement_timeout TO #{timeout * 1000}")
    c.execute(statement)
  end
end