Class: Webhookdb::DBAdapter::Snowflake

Inherits:
Webhookdb::DBAdapter show all
Includes:
ColumnTypes, DefaultSql
Defined in:
lib/webhookdb/db_adapter/snowflake.rb

Defined Under Namespace

Classes: SnowsqlConnection

Constant Summary collapse

COLTYPE_MAP =
{
  BIGINT => "bigint",
  BOOLEAN => "boolean",
  DATE => "date",
  DECIMAL => "numeric",
  DOUBLE => "double precision",
  FLOAT => "float",
  INTEGER => "integer",
  OBJECT => "object",
  TEXT => "text",
  TIMESTAMP => "timestamptz",
}.freeze

Constants included from DefaultSql

DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS

Constants included from ColumnTypes

ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID

Constants inherited from Webhookdb::DBAdapter

INVALID_IDENTIFIER_MESSAGE, INVALID_IDENTIFIER_PROMPT, VALID_IDENTIFIER

Instance Attribute Summary

Attributes inherited from Webhookdb::DBAdapter

#name, #table, #targets, #unique

Instance Method Summary collapse

Methods included from DefaultSql

#assign_columns_sql, #create_schema_sql, #create_table_sql, #escape_identifier, #qualify_table

Methods inherited from Webhookdb::DBAdapter

adapter, #create_schema_sql, #create_table_sql, supported_adapters_message, valid_identifier?, validate_identifier!, #verify_connection

Instance Method Details

#_verify_connection(url, timeout:, statement:) ⇒ Object



115
116
117
118
119
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 115

def _verify_connection(url, timeout:, statement:)
  _ = timeout
  conn = self.connection(url)
  conn.execute(statement)
end

#add_column_sql(table, column, if_not_exists: false) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 46

def add_column_sql(table, column, if_not_exists: false)
  c = self.column_create_sql(column)
  # Snowflake has no 'ADD COLUMN IF NOT EXISTS' so we need to query the long way around
  add_sql = "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN #{c}"
  return add_sql unless if_not_exists
  # The 'ILIKE' is a case-insensitive string compare,
  # which is important because snowflake uppercases values when it stores them.
  conditional_sql = <<~SQL
    EXECUTE IMMEDIATE $$
    BEGIN
      IF (NOT EXISTS(
        SELECT * FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA ILIKE '#{table.schema.name}'
          AND TABLE_NAME ILIKE '#{table.name}'
          AND COLUMN_NAME ILIKE '#{column.name}'
      )) THEN
        #{add_sql};
      END IF;
    END;
    $$
  SQL
  return conditional_sql.rstrip
end

#column_create_sql(column) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 34

def column_create_sql(column)
  modifiers = +""
  if column.unique?
    modifiers << " UNIQUE NOT NULL"
  elsif !column.nullable?
    modifiers << " NOT NULL"
  end
  coltype = COLTYPE_MAP.fetch(column.type)
  colname = self.escape_identifier(column.name)
  return "#{colname} #{coltype}#{modifiers}"
end

#connection(url) ⇒ Object



26
27
28
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 26

def connection(url)
  return SnowsqlConnection.new(url)
end

#create_index_sqlObject

Raises:

  • (NotImplementedError)


30
31
32
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 30

def create_index_sql(*)
  raise NotImplementedError, "Snowflake does not support indices"
end

#identifier_quote_charObject



121
122
123
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 121

def identifier_quote_char
  return ""
end

#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 70

def merge_from_csv(connection, file, table, pk_col, copy_columns)
  raise Webhookdb::InvalidPrecondition, "table must have schema" if table.schema.nil?

  qtable = self.qualify_table(table)

  stage = self.escape_identifier("whdb_tempstage_#{SecureRandom.hex(2)}_#{table.name}")
  stage = self.escape_identifier(table.schema.name) + "." + stage

  pkname = self.escape_identifier(pk_col.name)
  # JSON columns need to be parsed from the CSV, so object columns need parse_json calls.
  col_assigns = self.assign_columns_sql("src", nil, copy_columns) do |c, lhs, rhs|
    if c.type == OBJECT
      [lhs, "parse_json(#{rhs})"]
    else
      [lhs, rhs]
    end
  end
  col_names = copy_columns.map { |c| self.escape_identifier(c.name) }
  col_values = col_names.each_with_index.map do |n, i|
    if copy_columns[i].type == OBJECT
      "parse_json(src.#{n})"
    else
      "src.#{n}"
    end
  end
  col_placeholders = col_names.each_with_index.map { |n, i| "$#{i + 1} #{n}" }
  # Props to https://stackoverflow.com/questions/63084511/snowflake-upsert-from-staged-files
  # for the merge from stage code.
  # The enclosed option is vital because otherwise it doesn't interpret JSON columns properly.
  import_sql = <<~SQL
    CREATE STAGE #{stage} FILE_FORMAT = (type = 'CSV' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"');

    PUT file://#{file.path} @#{stage} auto_compress=true;

    MERGE INTO #{qtable} AS tgt
      USING (
        SELECT #{col_placeholders.join(', ')} FROM @#{stage}
      ) src
      ON tgt.#{pkname} = src.#{pkname}
      WHEN MATCHED THEN UPDATE SET #{col_assigns}
      WHEN NOT MATCHED THEN INSERT (#{col_names.join(', ')}) values (#{col_values.join(', ')});
  SQL
  connection.execute(import_sql)
end