Class: Webhookdb::DBAdapter::Snowflake

Inherits:
Webhookdb::DBAdapter show all
Includes:
ColumnTypes, DefaultSql
Defined in:
lib/webhookdb/db_adapter/snowflake.rb

Defined Under Namespace

Classes: SnowsqlConnection

Constant Summary collapse

COLTYPE_MAP =
{
  BIGINT => "bigint",
  BOOLEAN => "boolean",
  DATE => "date",
  DECIMAL => "numeric",
  DOUBLE => "double precision",
  FLOAT => "float",
  INTEGER => "integer",
  OBJECT => "object",
  TEXT => "text",
  TIMESTAMP => "timestamptz",
}.freeze

Constants included from DefaultSql

DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS

Constants included from ColumnTypes

ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID

Constants inherited from Webhookdb::DBAdapter

INVALID_IDENTIFIER_MESSAGE, VALID_IDENTIFIER

Instance Attribute Summary

Attributes inherited from Webhookdb::DBAdapter

#name, #table, #targets, #unique

Instance Method Summary collapse

Methods included from DefaultSql

#assign_columns_sql, #create_schema_sql, #create_table_sql, #escape_identifier, #qualify_table

Methods inherited from Webhookdb::DBAdapter

adapter, #create_schema_sql, #create_table_sql, supported_adapters_message, #verify_connection

Instance Method Details

#_verify_connection(url, timeout:, statement:) ⇒ Object



115
116
117
118
119
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 115

def _verify_connection(url, timeout:, statement:)
  _ = timeout
  conn = self.connection(url)
  conn.execute(statement)
end

#add_column_sql(table, column, if_not_exists: false) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 46

def add_column_sql(table, column, if_not_exists: false)
  c = self.column_create_sql(column)
  # Snowflake has no 'ADD COLUMN IF NOT EXISTS' so we need to query the long way around
  add_sql = "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN #{c}"
  return add_sql unless if_not_exists
  # The 'ILIKE' is a case-insensitive string compare,
  # which is important because snowflake uppercases values when it stores them.
  conditional_sql = "    EXECUTE IMMEDIATE $$\n    BEGIN\n      IF (NOT EXISTS(\n        SELECT * FROM INFORMATION_SCHEMA.COLUMNS\n        WHERE TABLE_SCHEMA ILIKE '\#{table.schema.name}'\n          AND TABLE_NAME ILIKE '\#{table.name}'\n          AND COLUMN_NAME ILIKE '\#{column.name}'\n      )) THEN\n        \#{add_sql};\n      END IF;\n    END;\n    $$\n  SQL\n  return conditional_sql.rstrip\nend\n"

#column_create_sql(column) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 34

def column_create_sql(column)
  modifiers = +""
  if column.unique?
    modifiers << " UNIQUE NOT NULL"
  elsif !column.nullable?
    modifiers << " NOT NULL"
  end
  coltype = COLTYPE_MAP.fetch(column.type)
  colname = self.escape_identifier(column.name)
  return "#{colname} #{coltype}#{modifiers}"
end

#connection(url) ⇒ Object



26
27
28
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 26

def connection(url)
  return SnowsqlConnection.new(url)
end

#create_index_sqlObject

Raises:



30
31
32
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 30

def create_index_sql(*)
  raise NotImplementedError, "Snowflake does not support indices"
end

#identifier_quote_charObject



121
122
123
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 121

def identifier_quote_char
  return ""
end

#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 70

def merge_from_csv(connection, file, table, pk_col, copy_columns)
  raise Webhookdb::InvalidPrecondition, "table must have schema" if table.schema.nil?

  qtable = self.qualify_table(table)

  stage = self.escape_identifier("whdb_tempstage_#{SecureRandom.hex(2)}_#{table.name}")
  stage = self.escape_identifier(table.schema.name) + "." + stage

  pkname = self.escape_identifier(pk_col.name)
  # JSON columns need to be parsed from the CSV, so object columns need parse_json calls.
  col_assigns = self.assign_columns_sql("src", nil, copy_columns) do |c, lhs, rhs|
    if c.type == OBJECT
      [lhs, "parse_json(#{rhs})"]
    else
      [lhs, rhs]
    end
  end
  col_names = copy_columns.map { |c| self.escape_identifier(c.name) }
  col_values = col_names.each_with_index.map do |n, i|
    if copy_columns[i].type == OBJECT
      "parse_json(src.#{n})"
    else
      "src.#{n}"
    end
  end
  col_placeholders = col_names.each_with_index.map { |n, i| "$#{i + 1} #{n}" }
  # Props to https://stackoverflow.com/questions/63084511/snowflake-upsert-from-staged-files
  # for the merge from stage code.
  # The enclosed option is vital because otherwise it doesn't interpret JSON columns properly.
  import_sql = "    CREATE STAGE \#{stage} FILE_FORMAT = (type = 'CSV' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');\n\n    PUT file://\#{file.path} @\#{stage} auto_compress=true;\n\n    MERGE INTO \#{qtable} AS tgt\n      USING (\n        SELECT \#{col_placeholders.join(', ')} FROM @\#{stage}\n      ) src\n      ON tgt.\#{pkname} = src.\#{pkname}\n      WHEN MATCHED THEN UPDATE SET \#{col_assigns}\n      WHEN NOT MATCHED THEN INSERT (\#{col_names.join(', ')}) values (\#{col_values.join(', ')});\n  SQL\n  connection.execute(import_sql)\nend\n"