Class: SeaDuck::Catalog
- Inherits:
-
Object
show all
- Defined in:
- lib/seaduck/catalog.rb
Instance Method Summary
collapse
Instance Method Details
#_initialize(url, default_namespace:, attach_options:, secret_options: nil, extensions: []) ⇒ Object
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/seaduck/catalog.rb', line 3
def _initialize(url, default_namespace:, attach_options:, secret_options: nil, extensions: [])
@catalog = "iceberg"
@default_namespace = default_namespace
@db = DuckDB::Database.open
@conn = @db.connect
install_extension("iceberg")
extensions.each do |extension|
install_extension(extension)
end
create_secret(secret_options) if secret_options
attach_with_options(@catalog, url, {type: "iceberg"}.merge(attach_options))
begin
use_namespace(@default_namespace)
rescue Error
create_namespace(@default_namespace, if_not_exists: true)
use_namespace(@default_namespace)
end
execute("DETACH memory")
end
|
#attach(alias_, url) ⇒ Object
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/seaduck/catalog.rb', line 85
def attach(alias_, url)
type = nil
extension = nil
uri = URI.parse(url)
case uri.scheme
when "postgres", "postgresql"
type = "postgres"
extension = "postgres"
else
raise ArgumentError, "Unsupported data source type: #{uri.scheme}"
end
install_extension(extension) if extension
options = {
type: type,
read_only: true
}
attach_with_options(alias_, url, options)
end
|
#create_namespace(namespace, if_not_exists: nil) ⇒ Object
30
31
32
33
|
# File 'lib/seaduck/catalog.rb', line 30
def create_namespace(namespace, if_not_exists: nil)
execute("CREATE SCHEMA#{" IF NOT EXISTS" if if_not_exists} #{quote_namespace(namespace)}")
nil
end
|
#detach(alias_) ⇒ Object
107
108
109
110
|
# File 'lib/seaduck/catalog.rb', line 107
def detach(alias_)
execute("DETACH #{quote_identifier(alias_)}")
nil
end
|
#drop_namespace(namespace, if_exists: nil) ⇒ Object
CASCADE not implemented for Iceberg yet
40
41
42
43
|
# File 'lib/seaduck/catalog.rb', line 40
def drop_namespace(namespace, if_exists: nil)
execute("DROP SCHEMA#{" IF EXISTS" if if_exists} #{quote_namespace(namespace)}")
nil
end
|
#drop_table(table_name, if_exists: nil) ⇒ Object
62
63
64
|
# File 'lib/seaduck/catalog.rb', line 62
def drop_table(table_name, if_exists: nil)
execute("DROP TABLE#{" IF EXISTS" if if_exists} #{quote_table(table_name)}")
end
|
#inspect ⇒ Object
144
145
146
|
# File 'lib/seaduck/catalog.rb', line 144
def inspect
to_s
end
|
#list_namespaces ⇒ Object
26
27
28
|
# File 'lib/seaduck/catalog.rb', line 26
def list_namespaces
execute("SELECT schema_name FROM information_schema.schemata WHERE catalog_name = ?", [@catalog]).rows
end
|
#list_tables(namespace = nil) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/seaduck/catalog.rb', line 45
def list_tables(namespace = nil)
sql = +"SELECT table_schema, table_name FROM information_schema.tables WHERE table_catalog = ?"
params = [@catalog]
if namespace
sql << " AND table_schema = ?"
params << namespace
end
execute(sql, params).rows
end
|
#namespace_exists?(namespace) ⇒ Boolean
35
36
37
|
# File 'lib/seaduck/catalog.rb', line 35
def namespace_exists?(namespace)
execute("SELECT 1 FROM information_schema.schemata WHERE catalog_name = ? AND schema_name = ?", [@catalog, namespace]).any?
end
|
#quote(value) ⇒ Object
libduckdb does not provide function TODO support more types
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/seaduck/catalog.rb', line 120
def quote(value)
if value.nil?
"NULL"
elsif value == true
"true"
elsif value == false
"false"
elsif defined?(BigDecimal) && value.is_a?(BigDecimal)
value.to_s("F")
elsif value.is_a?(Numeric)
value.to_s
else
if value.is_a?(Time)
value = value.utc.iso8601(9)
elsif value.is_a?(DateTime)
value = value.iso8601(9)
elsif value.is_a?(Date)
value = value.strftime("%Y-%m-%d")
end
"'#{encoded(value).gsub("'", "''")}'"
end
end
|
#quote_identifier(value) ⇒ Object
114
115
116
|
# File 'lib/seaduck/catalog.rb', line 114
def quote_identifier(value)
"\"#{encoded(value).gsub('"', '""')}\""
end
|
#snapshots(table_name) ⇒ Object
66
67
68
|
# File 'lib/seaduck/catalog.rb', line 66
def snapshots(table_name)
symbolize_keys execute("SELECT * FROM iceberg_snapshots(#{quote_table(table_name)})")
end
|
#sql(sql, params = []) ⇒ Object
70
71
72
|
# File 'lib/seaduck/catalog.rb', line 70
def sql(sql, params = [])
execute(sql, params)
end
|
#table_exists?(table_name) ⇒ Boolean
57
58
59
60
|
# File 'lib/seaduck/catalog.rb', line 57
def table_exists?(table_name)
namespace, table_name = split_table(table_name)
execute("SELECT 1 FROM information_schema.tables WHERE table_catalog = ? AND table_schema = ? AND table_name = ?", [@catalog, namespace, table_name]).any?
end
|
#transaction ⇒ Object
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/seaduck/catalog.rb', line 74
def transaction
execute("BEGIN")
begin
yield
execute("COMMIT")
rescue => e
execute("ROLLBACK")
raise e unless e.is_a?(Rollback)
end
end
|