Class: Egis::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/egis/database.rb

Overview

Interface for database manipulation and querying.

Extends the interface of Client but all the queries scheduled using Database are executed within the database’s context. SQL table references without explicit database will implicitly refer to the database they are executed from.

It is recommended to create database objects using Client#database method.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, client: Egis::Client.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client)) ⇒ Database

Returns a new instance of Database.



17
18
19
20
21
# File 'lib/egis/database.rb', line 17

def initialize(name, client: Egis::Client.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client))
  @client = client
  @output_downloader = output_downloader
  @name = name
end

Instance Attribute Details

#nameString (readonly)

Returns Athena database name.

Returns:

  • (String)

    Athena database name



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/egis/database.rb', line 16

class Database
  def initialize(name, client: Egis::Client.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client))
    @client = client
    @output_downloader = output_downloader
    @name = name
  end

  attr_reader :name

  ##
  # Creates {Egis::Table} object. Executing it doesn't create Athena table yet.
  #
  # @param [String] table_name
  # @param [Egis::TableSchema] table_schema
  # @param [String] table_location S3 URL with table location (e.g. `s3://s3_bucket/table/location/`)
  # @param [:tsv, :csv, :orc, :orc_index_access, :json, String] format Table Format (defaults to :tsv)
  # @return [Egis::Table]

  def table(table_name, table_schema, table_location, **options)
    Table.new(self, table_name, table_schema, table_location, client: client, options: options)
  end

  ##
  # Creates database in Athena.
  #
  # @return [void]

  def create
    log_database_creation

    client.execute_query("CREATE DATABASE IF NOT EXISTS #{translate_name(name)};", async: false,
                                                                                   system_execution: true)
  end

  ##
  # The same as {#create} but raising error if it already exists.
  #
  # @return [void]

  def create!
    log_database_creation

    client.execute_query("CREATE DATABASE #{translate_name(name)};", async: false, system_execution: true)
  end

  ##
  # Removes database in Athena.
  #
  # @return [void]

  def drop
    log_database_removal

    client.execute_query("DROP DATABASE IF EXISTS #{translate_name(name)} CASCADE;", async: false,
                                                                                     system_execution: true)
  end

  ##
  # The same as {#drop} but raising error if it the database does not exist.
  #
  # @return [void]

  def drop!
    log_database_removal

    client.execute_query("DROP DATABASE #{translate_name(name)} CASCADE;", async: false, system_execution: true)
  end

  ##
  # (see Egis::Client#execute_query)

  def execute_query(query, **options)
    client.execute_query(query, **{database: name, **options})
  end

  ##
  # (see Egis::Client#query_status)

  def query_status(query_id)
    client.query_status(query_id)
  end

  ##
  # Checks whether database with such name exists in Athena.
  #
  # @return [Boolean]

  def exists?
    query_status = client.execute_query("SHOW DATABASES LIKE '#{name}';", async: false, system_execution: true)
    parsed_result = output_downloader.download(query_status.output_location)
    parsed_result.flatten.include?(name)
  end

  private

  attr_reader :client, :output_downloader

  def log_database_creation
    Egis.logger.info { "Creating database #{name}" }
  end

  def log_database_removal
    Egis.logger.info { "Removing database #{name}" }
  end

  def translate_name(name)
    Egis.mode.database_name(name)
  end
end

Instance Method Details

#createvoid

This method returns an undefined value.

Creates database in Athena.



43
44
45
46
47
48
# File 'lib/egis/database.rb', line 43

def create
  log_database_creation

  client.execute_query("CREATE DATABASE IF NOT EXISTS #{translate_name(name)};", async: false,
                                                                                 system_execution: true)
end

#create!void

This method returns an undefined value.

The same as #create but raising error if it already exists.



55
56
57
58
59
# File 'lib/egis/database.rb', line 55

def create!
  log_database_creation

  client.execute_query("CREATE DATABASE #{translate_name(name)};", async: false, system_execution: true)
end

#dropvoid

This method returns an undefined value.

Removes database in Athena.



66
67
68
69
70
71
# File 'lib/egis/database.rb', line 66

def drop
  log_database_removal

  client.execute_query("DROP DATABASE IF EXISTS #{translate_name(name)} CASCADE;", async: false,
                                                                                   system_execution: true)
end

#drop!void

This method returns an undefined value.

The same as #drop but raising error if it the database does not exist.



78
79
80
81
82
# File 'lib/egis/database.rb', line 78

def drop!
  log_database_removal

  client.execute_query("DROP DATABASE #{translate_name(name)} CASCADE;", async: false, system_execution: true)
end

#execute_query(query, **options) ⇒ Egis::QueryStatus

Executes Athena query. By default, queries are being executed asynchronously.

Parameters:

  • query (String)

    SQL query to execute

  • async (Boolean)

    Decide whether you want to run query asynchronously or block execution until it finishes

  • work_group (String)

    Change Athena work group the query will be executed in.

  • database (String)

    Run query in the context of a specific database (implicit table references are expected to be in given database).

  • output_location (String)

    S3 url of the desired output location. By default, Athena uses location defined in by workgroup.

Returns:



87
88
89
# File 'lib/egis/database.rb', line 87

def execute_query(query, **options)
  client.execute_query(query, **{database: name, **options})
end

#exists?Boolean

Checks whether database with such name exists in Athena.

Returns:

  • (Boolean)


103
104
105
106
107
# File 'lib/egis/database.rb', line 103

def exists?
  query_status = client.execute_query("SHOW DATABASES LIKE '#{name}';", async: false, system_execution: true)
  parsed_result = output_downloader.download(query_status.output_location)
  parsed_result.flatten.include?(name)
end

#query_status(query_id) ⇒ Egis::QueryStatus

Check the status of asynchronous query execution.

Parameters:

Returns:



94
95
96
# File 'lib/egis/database.rb', line 94

def query_status(query_id)
  client.query_status(query_id)
end

#table(table_name, table_schema, table_location, **options) ⇒ Egis::Table

Creates Table object. Executing it doesn’t create Athena table yet.

Parameters:

  • table_name (String)
  • table_schema (Egis::TableSchema)
  • table_location (String)

    S3 URL with table location (e.g. ‘s3://s3_bucket/table/location/`)

  • format (:tsv, :csv, :orc, :orc_index_access, :json, String)

    Table Format (defaults to :tsv)

Returns:



34
35
36
# File 'lib/egis/database.rb', line 34

def table(table_name, table_schema, table_location, **options)
  Table.new(self, table_name, table_schema, table_location, client: client, options: options)
end