Class: Webhookdb::DBAdapter::Snowflake

Inherits:
Webhookdb::DBAdapter show all
Includes:
ColumnTypes, DefaultSql
Defined in:
lib/webhookdb/db_adapter/snowflake.rb

Defined Under Namespace

Classes: SnowsqlConnection

Constant Summary collapse

COLTYPE_MAP =
{
  BIGINT => "bigint",
  BOOLEAN => "boolean",
  DATE => "date",
  DECIMAL => "numeric",
  DOUBLE => "double precision",
  FLOAT => "float",
  INTEGER => "integer",
  OBJECT => "object",
  TEXT => "text",
  TIMESTAMP => "timestamptz",
}.freeze

Constants included from DefaultSql

DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS

Constants included from ColumnTypes

ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID

Constants inherited from Webhookdb::DBAdapter

INVALID_IDENTIFIER_MESSAGE, INVALID_IDENTIFIER_PROMPT, VALID_IDENTIFIER

Instance Attribute Summary

Attributes inherited from Webhookdb::DBAdapter

#name, #table, #targets, #unique

Instance Method Summary collapse

Methods included from DefaultSql

#assign_columns_sql, #create_schema_sql, #escape_identifier, #qualify_table

Methods inherited from Webhookdb::DBAdapter

adapter, #create_index_sqls, #create_schema_sql, #escape_identifier, supported_adapters_message, valid_identifier?, validate_identifier!, #verify_connection

Instance Method Details

#_verify_connection(url, timeout:, statement:) ⇒ Object



126
127
128
129
130
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 126

def _verify_connection(url, timeout:, statement:)
  _ = timeout
  conn = self.connection(url)
  conn.execute(statement)
end

#add_column_sql(table, column, if_not_exists: false) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 57

def add_column_sql(table, column, if_not_exists: false)
  c = self.create_column_sql(column)
  # Snowflake has no 'ADD COLUMN IF NOT EXISTS' so we need to query the long way around
  add_sql = "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN #{c}"
  return add_sql unless if_not_exists
  # The 'ILIKE' is a case-insensitive string compare,
  # which is important because snowflake uppercases values when it stores them.
  conditional_sql = "    EXECUTE IMMEDIATE $$\n    BEGIN\n      IF (NOT EXISTS(\n        SELECT * FROM INFORMATION_SCHEMA.COLUMNS\n        WHERE TABLE_SCHEMA ILIKE '\#{table.schema.name}'\n          AND TABLE_NAME ILIKE '\#{table.name}'\n          AND COLUMN_NAME ILIKE '\#{column.name}'\n      )) THEN\n        \#{add_sql};\n      END IF;\n    END;\n    $$\n  SQL\n  return conditional_sql.rstrip\nend\n"

#connection(url) ⇒ Object



26
27
28
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 26

def connection(url)
  return SnowsqlConnection.new(url)
end

#create_column_sql(column) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 45

def create_column_sql(column)
  modifiers = +""
  if column.unique?
    modifiers << " UNIQUE NOT NULL"
  elsif !column.nullable?
    modifiers << " NOT NULL"
  end
  coltype = COLTYPE_MAP.fetch(column.type)
  colname = self.escape_identifier(column.name)
  return "#{colname} #{coltype}#{modifiers}"
end

#create_index_sqlObject



30
31
32
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 30

def create_index_sql(*)
  raise Webhookdb::InvalidPrecondition, "Snowflake does not support indices"
end

#create_table_sql(table, columns, if_not_exists: false) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 34

def create_table_sql(table, columns, if_not_exists: false, **)
  createtable = +"CREATE TABLE "
  createtable << "IF NOT EXISTS " if if_not_exists
  createtable << self.qualify_table(table)
  lines = ["#{createtable} ("]
  columns[0...-1]&.each { |c| lines << "  #{self.create_column_sql(c)}," }
  lines << "  #{self.create_column_sql(columns.last)}"
  lines << ")"
  return lines.join("\n")
end

#identifier_quote_charObject



132
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 132

def identifier_quote_char = ""

#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 81

def merge_from_csv(connection, file, table, pk_col, copy_columns)
  raise Webhookdb::InvalidPrecondition, "table must have schema" if table.schema.nil?

  qtable = self.qualify_table(table)

  stage = self.escape_identifier("whdb_tempstage_#{SecureRandom.hex(2)}_#{table.name}")
  stage = self.escape_identifier(table.schema.name) + "." + stage

  pkname = self.escape_identifier(pk_col.name)
  # JSON columns need to be parsed from the CSV, so object columns need parse_json calls.
  col_assigns = self.assign_columns_sql("src", nil, copy_columns) do |c, lhs, rhs|
    if c.type == OBJECT
      [lhs, "parse_json(#{rhs})"]
    else
      [lhs, rhs]
    end
  end
  col_names = copy_columns.map { |c| self.escape_identifier(c.name) }
  col_values = col_names.each_with_index.map do |n, i|
    if copy_columns[i].type == OBJECT
      "parse_json(src.#{n})"
    else
      "src.#{n}"
    end
  end
  col_placeholders = col_names.each_with_index.map { |n, i| "$#{i + 1} #{n}" }
  # Props to https://stackoverflow.com/questions/63084511/snowflake-upsert-from-staged-files
  # for the merge from stage code.
  # The enclosed option is vital because otherwise it doesn't interpret JSON columns properly.
  import_sql = "    CREATE STAGE \#{stage} FILE_FORMAT = (type = 'CSV' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');\n\n    PUT file://\#{file.path} @\#{stage} auto_compress=true;\n\n    MERGE INTO \#{qtable} AS tgt\n      USING (\n        SELECT \#{col_placeholders.join(', ')} FROM @\#{stage}\n      ) src\n      ON tgt.\#{pkname} = src.\#{pkname}\n      WHEN MATCHED THEN UPDATE SET \#{col_assigns}\n      WHEN NOT MATCHED THEN INSERT (\#{col_names.join(', ')}) values (\#{col_values.join(', ')});\n  SQL\n  connection.execute(import_sql)\nend\n"