Class: Webhookdb::DBAdapter::Snowflake
Defined Under Namespace
Classes: SnowsqlConnection
Constant Summary
collapse
- COLTYPE_MAP =
{
BIGINT => "bigint",
BOOLEAN => "boolean",
DATE => "date",
DECIMAL => "numeric",
DOUBLE => "double precision",
FLOAT => "float",
INTEGER => "integer",
OBJECT => "object",
TEXT => "text",
TIMESTAMP => "timestamptz",
}.freeze
Constants included
from DefaultSql
DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS
Constants included
from ColumnTypes
ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID
INVALID_IDENTIFIER_MESSAGE, INVALID_IDENTIFIER_PROMPT, VALID_IDENTIFIER
Instance Attribute Summary
#name, #table, #targets, #unique
Instance Method Summary
collapse
-
#_verify_connection(url, timeout:, statement:) ⇒ Object
-
#add_column_sql(table, column, if_not_exists: false) ⇒ Object
-
#connection(url) ⇒ Object
-
#create_column_sql(column) ⇒ Object
-
#create_index_sql ⇒ Object
-
#create_table_sql(table, columns, if_not_exists: false) ⇒ Object
-
#identifier_quote_char ⇒ Object
-
#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object
Methods included from DefaultSql
#assign_columns_sql, #create_schema_sql, #escape_identifier, #qualify_table
adapter, #create_index_sqls, #create_schema_sql, #escape_identifier, supported_adapters_message, valid_identifier?, validate_identifier!, #verify_connection
Instance Method Details
#_verify_connection(url, timeout:, statement:) ⇒ Object
126
127
128
129
130
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 126
def _verify_connection(url, timeout:, statement:)
_ = timeout
conn = self.connection(url)
conn.execute(statement)
end
|
#add_column_sql(table, column, if_not_exists: false) ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 57
def add_column_sql(table, column, if_not_exists: false)
c = self.create_column_sql(column)
add_sql = "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN #{c}"
return add_sql unless if_not_exists
conditional_sql = " EXECUTE IMMEDIATE $$\n BEGIN\n IF (NOT EXISTS(\n SELECT * FROM INFORMATION_SCHEMA.COLUMNS\n WHERE TABLE_SCHEMA ILIKE '\#{table.schema.name}'\n AND TABLE_NAME ILIKE '\#{table.name}'\n AND COLUMN_NAME ILIKE '\#{column.name}'\n )) THEN\n \#{add_sql};\n END IF;\n END;\n $$\n SQL\n return conditional_sql.rstrip\nend\n"
|
#connection(url) ⇒ Object
26
27
28
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 26
def connection(url)
return SnowsqlConnection.new(url)
end
|
#create_column_sql(column) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 45
def create_column_sql(column)
modifiers = +""
if column.unique?
modifiers << " UNIQUE NOT NULL"
elsif !column.nullable?
modifiers << " NOT NULL"
end
coltype = COLTYPE_MAP.fetch(column.type)
colname = self.escape_identifier(column.name)
return "#{colname} #{coltype}#{modifiers}"
end
|
#create_index_sql ⇒ Object
30
31
32
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 30
def create_index_sql(*)
raise Webhookdb::InvalidPrecondition, "Snowflake does not support indices"
end
|
#create_table_sql(table, columns, if_not_exists: false) ⇒ Object
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 34
def create_table_sql(table, columns, if_not_exists: false, **)
createtable = +"CREATE TABLE "
createtable << "IF NOT EXISTS " if if_not_exists
createtable << self.qualify_table(table)
lines = ["#{createtable} ("]
columns[0...-1]&.each { |c| lines << " #{self.create_column_sql(c)}," }
lines << " #{self.create_column_sql(columns.last)}"
lines << ")"
return lines.join("\n")
end
|
#identifier_quote_char ⇒ Object
132
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 132
def identifier_quote_char = ""
|
#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/webhookdb/db_adapter/snowflake.rb', line 81
def merge_from_csv(connection, file, table, pk_col, copy_columns)
raise Webhookdb::InvalidPrecondition, "table must have schema" if table.schema.nil?
qtable = self.qualify_table(table)
stage = self.escape_identifier("whdb_tempstage_#{SecureRandom.hex(2)}_#{table.name}")
stage = self.escape_identifier(table.schema.name) + "." + stage
pkname = self.escape_identifier(pk_col.name)
col_assigns = self.assign_columns_sql("src", nil, copy_columns) do |c, lhs, rhs|
if c.type == OBJECT
[lhs, "parse_json(#{rhs})"]
else
[lhs, rhs]
end
end
col_names = copy_columns.map { |c| self.escape_identifier(c.name) }
col_values = col_names.each_with_index.map do |n, i|
if copy_columns[i].type == OBJECT
"parse_json(src.#{n})"
else
"src.#{n}"
end
end
col_placeholders = col_names.each_with_index.map { |n, i| "$#{i + 1} #{n}" }
import_sql = " CREATE STAGE \#{stage} FILE_FORMAT = (type = 'CSV' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');\n\n PUT file://\#{file.path} @\#{stage} auto_compress=true;\n\n MERGE INTO \#{qtable} AS tgt\n USING (\n SELECT \#{col_placeholders.join(', ')} FROM @\#{stage}\n ) src\n ON tgt.\#{pkname} = src.\#{pkname}\n WHEN MATCHED THEN UPDATE SET \#{col_assigns}\n WHEN NOT MATCHED THEN INSERT (\#{col_names.join(', ')}) values (\#{col_values.join(', ')});\n SQL\n connection.execute(import_sql)\nend\n"
|