Class: Polars::Catalog

Inherits:
Object
  • Object
show all
Defined in:
lib/polars/catalog.rb,
lib/polars/catalog/unity/table_info.rb,
lib/polars/catalog/unity/column_info.rb,
lib/polars/catalog/unity/catalog_info.rb,
lib/polars/catalog/unity/namespace_info.rb

Overview

Unity catalog client.

Defined Under Namespace

Modules: Unity

Instance Method Summary collapse

Constructor Details

#initialize(workspace_url, bearer_token: "auto", require_https: true) ⇒ Catalog

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Initialize a catalog client.

Parameters:

  • workspace_url (String)

    URL of the workspace, or alternatively the URL of the Unity catalog API endpoint.

  • bearer_token (String) (defaults to: "auto")

    Bearer token to authenticate with. This can also be set to:

    • "auto": Automatically retrieve bearer tokens from the environment.
    • "databricks-sdk": Use the Databricks SDK to retrieve and use the bearer token from the environment.
  • require_https (Boolean) (defaults to: true)

    Require the workspace_url to use HTTPS.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/polars/catalog.rb', line 20

def initialize(workspace_url, bearer_token: "auto", require_https: true)
  if require_https && !workspace_url.start_with?("https://")
    msg = (
      "a non-HTTPS workspace_url was given. To " +
      "allow non-HTTPS URLs, pass require_https: false."
    )
    raise ArgumentError, msg
  end

  if bearer_token == "auto"
    bearer_token = nil
  end

  @client = RbCatalogClient.new(workspace_url, bearer_token)
end

Instance Method Details

#create_catalog(catalog_name, comment: nil, storage_root: nil) ⇒ CatalogInfo

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Create a catalog.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • comment (String) (defaults to: nil)

    Leaves a comment about the catalog.

  • storage_root (String) (defaults to: nil)

    Base location at which to store the catalog.

Returns:

  • (CatalogInfo)


269
270
271
# File 'lib/polars/catalog.rb', line 269

def create_catalog(catalog_name, comment: nil, storage_root: nil)
  @client.create_catalog(catalog_name, comment, storage_root)
end

#create_namespace(catalog_name, namespace, comment: nil, storage_root: nil) ⇒ NamespaceInfo

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Create a namespace (unity schema) in the catalog.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • comment (String) (defaults to: nil)

    Leaves a comment about the table.

  • storage_root (String) (defaults to: nil)

    Base location at which to store the namespace.

Returns:

  • (NamespaceInfo)


309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/polars/catalog.rb', line 309

def create_namespace(
  catalog_name,
  namespace,
  comment: nil,
  storage_root: nil
)
  @client.create_namespace(
    catalog_name,
    namespace,
    comment,
    storage_root
  )
end

#create_table(catalog_name, namespace, table_name, schema:, table_type:, data_source_format: nil, comment: nil, storage_root: nil, properties: nil) ⇒ TableInfo

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Create a table in the catalog.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • table_name (String)

    Name of the table.

  • schema (Object)

    Schema of the table.

  • table_type (Object)

    Type of the table

  • data_source_format (Object) (defaults to: nil)

    Storage format of the table.

  • comment (String) (defaults to: nil)

    Leaves a comment about the table.

  • storage_root (String) (defaults to: nil)

    Base location at which to store the table.

  • properties (Hash) (defaults to: nil)

    Extra key-value metadata to store.

Returns:

  • (TableInfo)


375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/polars/catalog.rb', line 375

def create_table(
  catalog_name,
  namespace,
  table_name,
  schema:,
  table_type:,
  data_source_format: nil,
  comment: nil,
  storage_root: nil,
  properties: nil
)
  @client.create_table(
    catalog_name,
    namespace,
    table_name,
    schema,
    table_type,
    data_source_format,
    comment,
    storage_root,
    (properties || {}).to_a
  )
end

#delete_catalog(catalog_name, force: false) ⇒ Object

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Delete a catalog.

Note that depending on the table type and catalog server, this may not delete the actual data files from storage. For more details, please consult the documentation of the catalog provider you are using.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • force (Boolean) (defaults to: false)

    Forcibly delete the catalog even if it is not empty.

Returns:



289
290
291
# File 'lib/polars/catalog.rb', line 289

def delete_catalog(catalog_name, force: false)
  @client.delete_catalog(catalog_name, force)
end

#delete_namespace(catalog_name, namespace, force: false) ⇒ Object

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Delete a namespace (unity schema) in the catalog.

Note that depending on the table type and catalog server, this may not delete the actual data files from storage. For more details, please consult the documentation of the catalog provider you are using.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • force (Boolean) (defaults to: false)

    Forcibly delete the namespace even if it is not empty.

Returns:



341
342
343
344
345
346
347
# File 'lib/polars/catalog.rb', line 341

def delete_namespace(
  catalog_name,
  namespace,
  force: false
)
  @client.delete_namespace(catalog_name, namespace, force)
end

#delete_table(catalog_name, namespace, table_name) ⇒ Object

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Delete the table stored at this location.

Note that depending on the table type and catalog server, this may not delete the actual data files from storage. For more details, please consult the documentation of the catalog provider you are using.

If you would like to perform manual deletions, the storage location of the files can be found using get_table_info.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • table_name (String)

    Name of the table.

Returns:



420
421
422
423
424
425
426
427
428
429
430
# File 'lib/polars/catalog.rb', line 420

def delete_table(
  catalog_name,
  namespace,
  table_name
)
  @client.delete_table(
    catalog_name,
    namespace,
    table_name
  )
end

#get_table_info(catalog_name, namespace, table_name) ⇒ TableInfo

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Retrieve the metadata of the specified table.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • table_name (String)

    Name of the table.

Returns:

  • (TableInfo)


91
92
93
# File 'lib/polars/catalog.rb', line 91

def get_table_info(catalog_name, namespace, table_name)
  @client.get_table_info(catalog_name, namespace, table_name)
end

#list_catalogsArray

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

List the available catalogs.

Returns:



43
44
45
# File 'lib/polars/catalog.rb', line 43

def list_catalogs
  @client.list_catalogs
end

#list_namespaces(catalog_name) ⇒ Array

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

List the available namespaces (unity schema) under the specified catalog.

Parameters:

  • catalog_name (String)

    Name of the catalog.

Returns:



57
58
59
# File 'lib/polars/catalog.rb', line 57

def list_namespaces(catalog_name)
  @client.list_namespaces(catalog_name)
end

#list_tables(catalog_name, namespace) ⇒ Array

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

List the available tables under the specified schema.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

Returns:



73
74
75
# File 'lib/polars/catalog.rb', line 73

def list_tables(catalog_name, namespace)
  @client.list_tables(catalog_name, namespace)
end

#scan_table(catalog_name, namespace, table_name, delta_table_version: nil, delta_table_options: nil, storage_options: nil) ⇒ LazyFrame

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Retrieve the metadata of the specified table.

Parameters:

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • table_name (String)

    Name of the table.

  • delta_table_version (Object) (defaults to: nil)

    Version of the table to scan (Deltalake only).

  • delta_table_options (Hash) (defaults to: nil)

    Additional keyword arguments while reading a Deltalake table.

  • storage_options (Hash) (defaults to: nil)

    Options that indicate how to connect to a cloud provider.

    The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here:

    • aws
    • gcp
    • azure
    • Hugging Face (hf://): Accepts an API key under the token parameter: {"token" => "..."}, or by setting the HF_TOKEN environment variable.

    If storage_options is not provided, Polars will try to infer the information from environment variables.

Returns:

Raises:

  • (Todo)


127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/polars/catalog.rb', line 127

def scan_table(
  catalog_name,
  namespace,
  table_name,
  delta_table_version: nil,
  delta_table_options: nil,
  storage_options: nil
)
  table_info = get_table_info(catalog_name, namespace, table_name)
  storage_location, data_source_format = _extract_location_and_data_format(
    table_info, "scan table"
  )

  if ["DELTA", "DELTASHARING"].include?(data_source_format)
    return Polars.scan_delta(
      storage_location,
      version: delta_table_version,
      delta_table_options: delta_table_options,
      storage_options: storage_options
    )
  end

  if !delta_table_version.nil?
    msg = (
      "cannot apply delta_table_version for table of type " +
      "#{data_source_format}"
    )
    raise ArgumentError, msg
  end

  if !delta_table_options.nil?
    msg = (
      "cannot apply delta_table_options for table of type " +
      "#{data_source_format}"
    )
    raise ArgumentError, msg
  end

  if storage_options&.any?
    storage_options = storage_options.to_a
  else
    # Handle empty dict input
    storage_options = nil
  end

  raise Todo
end

#write_table(df, catalog_name, namespace, table_name, delta_mode: "error", delta_write_options: nil, delta_merge_options: nil, storage_options: nil) ⇒ Object

Note:

This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.

Write a DataFrame to a catalog table.

Parameters:

  • df (DataFrame)

    DataFrame to write.

  • catalog_name (String)

    Name of the catalog.

  • namespace (String)

    Name of the namespace (unity schema).

  • table_name (String)

    Name of the table.

  • delta_mode ('error', 'append', 'overwrite', 'ignore', 'merge') (defaults to: "error")

    (For delta tables) How to handle existing data.

    • If 'error', throw an error if the table already exists (default).
    • If 'append', will add new data.
    • If 'overwrite', will replace table with new data.
    • If 'ignore', will not write anything if table already exists.
    • If 'merge', return a TableMerger object to merge data from the DataFrame with the existing data.
  • delta_write_options (Hash) (defaults to: nil)

    (For delta tables) Additional keyword arguments while writing a Delta lake Table. See a list of supported write options here.

  • delta_merge_options (Hash) (defaults to: nil)

    (For delta tables) Keyword arguments which are required to MERGE a Delta lake Table. See a list of supported merge options here.

  • storage_options (Hash) (defaults to: nil)

    Options that indicate how to connect to a cloud provider.

    The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here:

    • aws
    • gcp
    • azure
    • Hugging Face (hf://): Accepts an API key under the token parameter: {"token" => "..."}, or by setting the HF_TOKEN environment variable.

    If storage_options is not provided, Polars will try to infer the information from environment variables.

Returns:



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/polars/catalog.rb', line 222

def write_table(
  df,
  catalog_name,
  namespace,
  table_name,
  delta_mode: "error",
  delta_write_options: nil,
  delta_merge_options: nil,
  storage_options: nil
)
  table_info = get_table_info(catalog_name, namespace, table_name)
  storage_location, data_source_format = _extract_location_and_data_format(
    table_info, "scan table"
  )

  if ["DELTA", "DELTASHARING"].include?(data_source_format)
    df.write_delta(
      storage_location,
      storage_options: storage_options,
      mode: delta_mode,
      delta_write_options: delta_write_options,
      delta_merge_options: delta_merge_options
    )
  else
    msg = (
      "write_table: table format of " +
      "#{catalog_name}.#{namespace}.#{table_name} " +
      "(#{data_source_format}) is unsupported."
    )
    raise NotImplementedError, msg
  end
end