Class: Ath::Driver

Inherits:
Object
  • Object
show all
Defined in:
lib/ath/driver.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(athena:, s3:, output_location:, database:) ⇒ Driver

Returns a new instance of Driver.



4
5
6
7
8
9
# File 'lib/ath/driver.rb', line 4

def initialize(athena:, s3:, output_location:, database:)
  @athena = athena
  @s3 = s3
  @output_location = output_location
  @database = database
end

Instance Attribute Details

#databaseObject

Returns the value of attribute database.



2
3
4
# File 'lib/ath/driver.rb', line 2

def database
  @database
end

Instance Method Details

#get_query_execution(query_execution_id:) ⇒ Object



11
12
13
# File 'lib/ath/driver.rb', line 11

def get_query_execution(query_execution_id:)
  @athena.get_query_execution(query_execution_id: query_execution_id).query_execution
end

#get_query_execution_result(query_execution_id:) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/ath/driver.rb', line 20

def get_query_execution_result(query_execution_id:)
  bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id)
  tmp = Tempfile.create('ath')

  if block_given?
    @s3.get_object(bucket: bucket, key: key) do |chunk|
      yield(chunk)
      tmp.write(chunk)
    end
  else
    @s3.get_object(bucket: bucket, key: key) do |chunk|
      tmp.write(chunk)
    end
  end

  tmp.flush
  tmp
end

#get_query_execution_result_output_location(query_execution_id:) ⇒ Object



44
45
46
47
48
# File 'lib/ath/driver.rb', line 44

def get_query_execution_result_output_location(query_execution_id:)
  query_execution = @athena.get_query_execution(query_execution_id: query_execution_id).query_execution
  output_location = query_execution.result_configuration.output_location
  output_location.sub(%r{\As3://}, '').split('/', 2)
end

#head_query_execution_result(query_execution_id:) ⇒ Object



39
40
41
42
# File 'lib/ath/driver.rb', line 39

def head_query_execution_result(query_execution_id:)
  bucket, key = get_query_execution_result_output_location(query_execution_id: query_execution_id)
  @s3.head_object(bucket: bucket, key: key)
end

#list_query_executionsObject



15
16
17
18
# File 'lib/ath/driver.rb', line 15

def list_query_executions
  query_execution_ids = @athena.list_query_executions.each_page.flat_map(&:query_execution_ids)
  @athena.batch_get_query_execution(query_execution_ids: query_execution_ids.slice(0, 50)).query_executions
end

#output_locationObject



61
62
63
# File 'lib/ath/driver.rb', line 61

def output_location
  @output_location
end

#output_location=(v) ⇒ Object



65
66
67
# File 'lib/ath/driver.rb', line 65

def output_location=(v)
  @output_location = v
end

#regionObject



69
70
71
# File 'lib/ath/driver.rb', line 69

def region
  @athena.config.region
end

#region=(v) ⇒ Object



73
74
75
76
77
# File 'lib/ath/driver.rb', line 73

def region=(v)
  @athena.config.region = v
  @athena.config.sigv4_region = v
  @athena.config.endpoint = Aws::EndpointProvider.resolve(v, 'athena')
end

#start_query_execution(query_string:) ⇒ Object



50
51
52
53
54
55
# File 'lib/ath/driver.rb', line 50

def start_query_execution(query_string:)
  @athena.start_query_execution(
    query_string: query_string,
    query_execution_context: {database: @database},
    result_configuration: {output_location: @output_location})
end

#stop_query_execution(query_execution_id:) ⇒ Object



57
58
59
# File 'lib/ath/driver.rb', line 57

def stop_query_execution(query_execution_id:)
  @athena.stop_query_execution(query_execution_id: query_execution_id)
end