Class: SeaDuck::Catalog

Inherits:
Object
  • Object
show all
Defined in:
lib/seaduck/catalog.rb

Direct Known Subclasses

GlueCatalog, RestCatalog, S3TablesCatalog

Instance Method Summary collapse

Instance Method Details

#_initialize(url, default_namespace:, attach_options:, secret_options: nil, extensions: []) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/seaduck/catalog.rb', line 3

def _initialize(url, default_namespace:, attach_options:, secret_options: nil, extensions: [])
  @catalog = "iceberg"
  @default_namespace = default_namespace

  @db = DuckDB::Database.open
  @conn = @db.connect

  install_extension("iceberg")
  extensions.each do |extension|
    install_extension(extension)
  end
  create_secret(secret_options) if secret_options
  attach_with_options(@catalog, url, {type: "iceberg"}.merge(attach_options))

  begin
    use_namespace(@default_namespace)
  rescue Error
    create_namespace(@default_namespace, if_not_exists: true)
    use_namespace(@default_namespace)
  end
  execute("DETACH memory")
end

#attach(alias_, url) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/seaduck/catalog.rb', line 85

def attach(alias_, url)
  type = nil
  extension = nil

  uri = URI.parse(url)
  case uri.scheme
  when "postgres", "postgresql"
    type = "postgres"
    extension = "postgres"
  else
    raise ArgumentError, "Unsupported data source type: #{uri.scheme}"
  end

  install_extension(extension) if extension

  options = {
    type: type,
    read_only: true
  }
  attach_with_options(alias_, url, options)
end

#create_namespace(namespace, if_not_exists: nil) ⇒ Object



30
31
32
33
# File 'lib/seaduck/catalog.rb', line 30

def create_namespace(namespace, if_not_exists: nil)
  execute("CREATE SCHEMA#{" IF NOT EXISTS" if if_not_exists} #{quote_namespace(namespace)}")
  nil
end

#detach(alias_) ⇒ Object



107
108
109
110
# File 'lib/seaduck/catalog.rb', line 107

def detach(alias_)
  execute("DETACH #{quote_identifier(alias_)}")
  nil
end

#drop_namespace(namespace, if_exists: nil) ⇒ Object

CASCADE not implemented for Iceberg yet



40
41
42
43
# File 'lib/seaduck/catalog.rb', line 40

def drop_namespace(namespace, if_exists: nil)
  execute("DROP SCHEMA#{" IF EXISTS" if if_exists} #{quote_namespace(namespace)}")
  nil
end

#drop_table(table_name, if_exists: nil) ⇒ Object



62
63
64
# File 'lib/seaduck/catalog.rb', line 62

def drop_table(table_name, if_exists: nil)
  execute("DROP TABLE#{" IF EXISTS" if if_exists} #{quote_table(table_name)}")
end

#inspectObject

hide internal state



144
145
146
# File 'lib/seaduck/catalog.rb', line 144

def inspect
  to_s
end

#list_namespacesObject



26
27
28
# File 'lib/seaduck/catalog.rb', line 26

def list_namespaces
  execute("SELECT schema_name FROM information_schema.schemata WHERE catalog_name = ?", [@catalog]).rows
end

#list_tables(namespace = nil) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/seaduck/catalog.rb', line 45

def list_tables(namespace = nil)
  sql = +"SELECT table_schema, table_name FROM information_schema.tables WHERE table_catalog = ?"
  params = [@catalog]

  if namespace
    sql << " AND table_schema = ?"
    params << namespace
  end

  execute(sql, params).rows
end

#namespace_exists?(namespace) ⇒ Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/seaduck/catalog.rb', line 35

def namespace_exists?(namespace)
  execute("SELECT 1 FROM information_schema.schemata WHERE catalog_name = ? AND schema_name = ?", [@catalog, namespace]).any?
end

#quote(value) ⇒ Object

libduckdb does not provide function TODO support more types



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/seaduck/catalog.rb', line 120

def quote(value)
  if value.nil?
    "NULL"
  elsif value == true
    "true"
  elsif value == false
    "false"
  elsif defined?(BigDecimal) && value.is_a?(BigDecimal)
    value.to_s("F")
  elsif value.is_a?(Numeric)
    value.to_s
  else
    if value.is_a?(Time)
      value = value.utc.iso8601(9)
    elsif value.is_a?(DateTime)
      value = value.iso8601(9)
    elsif value.is_a?(Date)
      value = value.strftime("%Y-%m-%d")
    end
    "'#{encoded(value).gsub("'", "''")}'"
  end
end

#quote_identifier(value) ⇒ Object



114
115
116
# File 'lib/seaduck/catalog.rb', line 114

def quote_identifier(value)
  "\"#{encoded(value).gsub('"', '""')}\""
end

#snapshots(table_name) ⇒ Object



66
67
68
# File 'lib/seaduck/catalog.rb', line 66

def snapshots(table_name)
  symbolize_keys execute("SELECT * FROM iceberg_snapshots(#{quote_table(table_name)})")
end

#sql(sql, params = []) ⇒ Object



70
71
72
# File 'lib/seaduck/catalog.rb', line 70

def sql(sql, params = [])
  execute(sql, params)
end

#table_exists?(table_name) ⇒ Boolean

Returns:

  • (Boolean)


57
58
59
60
# File 'lib/seaduck/catalog.rb', line 57

def table_exists?(table_name)
  namespace, table_name = split_table(table_name)
  execute("SELECT 1 FROM information_schema.tables WHERE table_catalog = ? AND table_schema = ? AND table_name = ?", [@catalog, namespace, table_name]).any?
end

#transactionObject



74
75
76
77
78
79
80
81
82
83
# File 'lib/seaduck/catalog.rb', line 74

def transaction
  execute("BEGIN")
  begin
    yield
    execute("COMMIT")
  rescue => e
    execute("ROLLBACK")
    raise e unless e.is_a?(Rollback)
  end
end