Class: DWH::Adapters::Trino

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/trino.rb

Overview

Trino adapter. This should work for Presto as well. This adapter requires the trino-client-ruby gem.

Create adatper instances using DWH.create.

Examples:

Basic connection with required only options

DWH.create(:trino, {host: 'localhost', catalog: 'native', username: 'Ajo'})

Connect with extra http headers

DWH.create(:trino, {host: 'localhost', port: 8080,
  catalog: 'native', username: 'Ajo',
  extra_connection_params: {
    http_headers: {
      'X-Trino-User' => 'True User Name',
      'X-Forwarded-Request' => '<request passed down from client'
    }
  }
  })

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/dwh/adapters/trino.rb', line 32

def connection
  return @connection if @connection

  ssl_setup = config[:ssl] ? { verify: false } : config[:ssl]

  properties = {
    server: "#{config[:host]}:#{config[:port]}",
    ssl: ssl_setup,
    schema: config[:schema],
    catalog: config[:catalog],
    user: config[:username],
    password: config[:password],
    query_timeout: config[:query_timeout],
    source: config[:client_name]
  }.merge(extra_connection_params)

  @connection = ::Trino::Client.new(properties)
rescue StandardError => e
  raise ConfigError, e.message
end

#execute(sql, format: :array, retries: 2) ⇒ Array<Array>, ...

Execute sql on the target database.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/dwh/adapters/trino.rb', line 139

def execute(sql, format: :array, retries: 2)
  result = with_debug(sql) do
    with_retry(retries) do
      if format == :object
        connection.run_with_names(sql)
      else
        connection.run(sql)
      end
    end
  end

  case format
  when :native
    result
  when :csv
    result_to_csv(result)
  when :array
    result[1]
  when :object
    result
  else
    raise UnsupportedCapability, "Unknown format type: #{format}. Should be :native, :array, :object, or :csv"
  end
rescue ::Trino::Client::TrinoQueryError => e
  raise ExecutionError, e.message
end

#execute_stream(sql, io, stats: nil, retries: 1) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/dwh/adapters/trino.rb', line 167

def execute_stream(sql, io, stats: nil, retries: 1)
  with_debug(sql) do
    with_retry(retries) do
      connection.query(sql) do |result|
        io.write(CSV.generate_line(result.columns.map(&:name)))
        result.each_row do |row|
          stats << row if stats
          io << CSV.generate_line(row)
        end
      end
    end
  end

  io.rewind
  io
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example:

("public.big_table")
("big_table")
("big_table",schema: "public")

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/dwh/adapters/trino.rb', line 117

def (table, **qualifiers)
  db_table = Table.new table, **qualifiers
  sql = "SHOW COLUMNS FROM #{db_table.fully_qualified_table_name}"

  _, cols = execute(sql, format: :native, retries: 1)

  cols.each do |col|
    dt = col[1].start_with?('row(') ? 'struct' : col[1]
    db_table << Column.new(
      name: col[0],
      data_type: dt
    )
  end

  db_table
end

#schema?Boolean



134
135
136
# File 'lib/dwh/adapters/trino.rb', line 134

def schema?
  config.key?(:schema)
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support

Raises:



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dwh/adapters/trino.rb', line 97

def stats(table, date_column: nil, **qualifiers)
  db_table = Table.new(table, **qualifiers)
  sql = <<-SQL
              SELECT count(*) ROW_COUNT
                  #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"}
                  #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"}
              FROM #{db_table.fully_qualified_table_name}
  SQL

  rows = execute(sql, retries: 1)
  row = rows[0]

  TableStats.new(
    date_start: row[1],
    date_end: row[2],
    row_count: row[0]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



185
186
187
188
189
190
191
# File 'lib/dwh/adapters/trino.rb', line 185

def stream(sql, &block)
  with_debug(sql) do
    connection.query(sql) do |result|
      result.each_row(&block)
    end
  end
end

#table?(table, **qualifiers) ⇒ Boolean

Check if table exists in remote db.

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/dwh/adapters/trino.rb', line 81

def table?(table, **qualifiers)
  db_table = Table.new(table, **qualifiers)

  query = ['SHOW TABLES']

  if db_table.catalog_or_schema?
    query << 'FROM'
    query << db_table.fully_qualified_schema_name
  end
  query << "LIKE '#{db_table.physical_name}'"

  rows = execute(query.compact.join(' '), retries: 1)
  !rows.empty?
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/dwh/adapters/trino.rb', line 64

def tables(**qualifiers)
  catalog, schema = qualifiers.values_at(:catalog, :schema)
  query = ['SHOW TABLES']
  query << 'FROM' if catalog || schema

  if catalog && schema
    query << "#{catalog}.#{schema}"
  else
    query << catalog
    query << schema
  end

  rows = execute(query.compact.join(' '), retries: 1)
  rows.flatten
end

#test_connection(raise_exception: false) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/dwh/adapters/trino.rb', line 54

def test_connection(raise_exception: false)
  connection.run('select 1')
  true
rescue ::Trino::Client::TrinoHttpError, Faraday::ConnectionFailed => e
  raise ConnectionError, e.message if raise_exception

  false
end

#valid_config?Boolean



193
194
195
196
197
198
# File 'lib/dwh/adapters/trino.rb', line 193

def valid_config?
  super
  require 'trino-client'
rescue LoadError
  raise ConfigError, "Required 'trino-client' gem missing. Please add it to your Gemfile."
end