Module: DatabricksSQL

Included in:
Databricks
Defined in:
lib/dbx/databricks/sql.rb

Overview

This module handles the execution of SQL statements via the DBX API. For more information about the DBX SQL API, see: docs.databricks.com/sql/admin/sql-execution-tutorial.html Azure specific tutorial: learn.microsoft.com/en-us/azure/databricks/sql/api/sql-execution-tutorial

Instance Method Summary collapse

Instance Method Details

#get_sql_chunk(chunk_url) ⇒ Hash<{"chunk_index" => Number, "row_offset" => Number, "row_count" => Number, "data_array" => Array<Array>}>

GET SQL chunk from DBX by internal link

Returns:

  • (Hash<{"chunk_index" => Number, "row_offset" => Number, "row_count" => Number, "data_array" => Array<Array>}>)

    # rubocop:disable Layout/LineLength



46
47
48
49
50
51
# File 'lib/dbx/databricks/sql.rb', line 46

def get_sql_chunk(chunk_url)
  puts "GET chunk: #{chunk_url}"
  request = Net::HTTP::Get.new(chunk_url, request_headers)
  response = http.request(request)
  DatabricksSQLResponse.new(response)
end

#get_sql_results(dbx_sql_response) ⇒ Object

GET results of SQL query from DBX.



65
66
67
68
69
70
71
# File 'lib/dbx/databricks/sql.rb', line 65

def get_sql_results(dbx_sql_response)
  statement_id = dbx_sql_response.statement_id
  http_response = http.request(sql_results_request(statement_id))
  response = DatabricksSQLResponse.new(http_response)
  puts "#{statement_id}: #{response.status}"
  response
end

#load_additional_chunks(response) ⇒ Object

Load additional chunks of data from DBX. DBX returns data with maximum chunk size of 16mb.



55
56
57
58
59
60
61
62
# File 'lib/dbx/databricks/sql.rb', line 55

def load_additional_chunks(response)
  next_chunk = response.next_chunk
  while next_chunk
    chunk_response = get_sql_chunk(next_chunk)
    response.add_chunk_to_data(chunk_response)
    next_chunk = chunk_response.next_chunk
  end
end

#post_sql_request(sql) ⇒ Object

POST SQL query to DBX



32
33
34
35
# File 'lib/dbx/databricks/sql.rb', line 32

def post_sql_request(sql)
  response = http.request(sql_request(sql))
  DatabricksSQLResponse.new(response)
end

#run_sql(sql) ⇒ DatabricksSQLResponse

Submit SQL query to DBX and return results.



89
90
91
92
93
94
95
# File 'lib/dbx/databricks/sql.rb', line 89

def run_sql(sql)
  posted_sql = post_sql_request(sql)
  sql_results = wait_for_sql_response(posted_sql)

  load_additional_chunks(sql_results) if sql_results.more_chunks?
  sql_results
end

#sql_request(sql) ⇒ Object

POST request object POST /api/2.0/sql/statements



20
21
22
23
24
25
26
27
28
29
# File 'lib/dbx/databricks/sql.rb', line 20

def sql_request(sql)
  params = {
    "statement" => sql,
    "wait_timeout" => "0s",
    "warehouse_id" => @warehouse
  }
  request = Net::HTTP::Post.new(sql_uri.request_uri, request_headers)
  request.body = params.to_json
  request
end

#sql_results_request(statement_id) ⇒ Object

GET request object GET /api/2.0/sql/statements/statement_id



39
40
41
42
# File 'lib/dbx/databricks/sql.rb', line 39

def sql_results_request(statement_id)
  req_uri = "#{sql_uri.request_uri}#{statement_id}"
  Net::HTTP::Get.new(req_uri, request_headers)
end

#sql_uriObject



14
15
16
# File 'lib/dbx/databricks/sql.rb', line 14

def sql_uri
  URI(sql_url)
end

#sql_urlObject



10
11
12
# File 'lib/dbx/databricks/sql.rb', line 10

def sql_url
  "#{@base_url}/api/2.0/sql/statements/"
end

#wait_for_sql_response(response) ⇒ Object

Wait for SQL query response from DBX. Returns a hash of the results of the SQL query.



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/dbx/databricks/sql.rb', line 75

def wait_for_sql_response(response)
  result = get_sql_results(response)
  still_running = result.pending?

  while still_running
    sleep(@sleep_timer)
    result = get_sql_results(response)
    still_running = result.pending?
  end
  result
end