Class: DWH::Adapters::Druid

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/druid.rb

Overview

Druid adapter.

Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.

Examples:

Basic connection with required only options

DWH.create(:druid, {host: 'localhost',port: 8080, protocol: 'http'})

Connect with SSL and basic authorization

DWH.create(:druid, {host: 'localhost',port: 8080, protocol: 'http',
    basic_auth: 'BASE_64 encoded authorization key'
})

Sending custom client name and user information

DWH.create(:druid, {host: 'localhost',port: 8080,
  client_name: 'Strata CLI', extra_connection_params: {
    context: {
      user: 'Ajo',
      team: 'Engineering'
    }
  }})

Constant Summary collapse

DRUID_STATUS =
'/status'.freeze
DRUID_DATASOURCES =
'/druid/coordinator/v1/datasources'.freeze
DRUID_SQL =
'/druid/v2/sql/'.freeze
COLUMNS_FOR_TABLE =
'"COLUMN_NAME","DATA_TYPE", "NUMERIC_PRECISION", "NUMERIC_SCALE", "CHARACTER_MAXIMUM_LENGTH"'.freeze

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject

Creates a connection to the target database and returns the connection object or self



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/dwh/adapters/druid.rb', line 39

def connection
  return @connection if @connection

  @connection = Faraday.new(
    url: "#{config[:protocol]}://#{config[:host]}:#{config[:port]}",
    headers: {
      'Content-Type' => 'application/json',
      **(config[:basic_auth] ? { 'Authorization' => "Basic #{config[:basic_auth]}" } : {})
    },
    request: {
      timeout: config[:query_timeout],
      open_timeout: config[:open_timeout],
      context: {
        client_name: config[:client_name]
      }
    }.merge(extra_connection_params)
  )

  @connection
end

#drop_unused_segments(table, interval) ⇒ Object

Marks unused segments of a datasource/table as unused

Parameters:

  • table (String)

    datasource/table name

  • interval (String)

    date interval in the format of from_date/to_date as valid ISO timestamps



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/dwh/adapters/druid.rb', line 111

def drop_unused_segments(table, interval)
  url = "/druid/coordinator/v1/datasources/#{table}/markUnused"

  logger.debug '=== Dropping Segments ==='

  response = connection.post(url) do |req|
    req.headers['Content-Type'] = 'application/json'
    req.body = { interval: interval }.to_json
  end

  logger.debug response.status
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array

    • object returns array of Hashes

    • csv returns as csv

    • native returns the native result from any clients used

      • For example: Postgres using pg client will return PG::Result

      • Http clients will returns the HTTP response object

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/dwh/adapters/druid.rb', line 151

def execute(sql, format: :array, retries: 0)
  format = format.to_sym
  result_format = format == :native ? 'array' : format.to_s
  resp = with_debug(sql) do
    with_retry(retries) do
      connection.post(DRUID_SQL) do |req|
        req.headers['Content-Type'] = 'application/json'
        req.body = {
          query: sql,
          resultFormat: result_format,
          context: { sqlTimeZone: 'Etc/UTC' }
        }.merge(extra_query_params)
                   .to_json
      end
    end
  end

  raise ExecutionError, "Could not execute #{sql}: \n #{resp.body}" if resp.status != 200

  if format == :native
    resp
  else
    format == :csv ? resp.body : JSON.parse(resp.body)
  end
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/dwh/adapters/druid.rb', line 178

def execute_stream(sql, io, stats: nil, retries: 0)
  resp = with_debug(sql) do
    with_retry(retries) do
      connection.post(DRUID_SQL) do |req|
        req.headers['Content-Type'] = 'application/json'
        req.body = {
          query: sql,
          resultFormat: 'csv',
          header: true
          # added timezone here due to druid bug
          # where date sub query joins failed without it.
          # context: { sqlTimeZone: 'Etc/UTC'}
        }.merge(extra_query_params).to_json

        parseable_row = ''
        req.options.on_data = proc do |chunk, _|
          handle_streaming_chunk(io, chunk, stats, parseable_row)
        end
      end
    end
  end

  io.rewind
  # Raise exception on failed runs
  raise ExecutionError, io.read unless resp.success?

  io
end

#metadata(table) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example:

("public.big_table")
("big_table")
("big_table",schema: "public")

Parameters:

  • table (String)
    • table name

  • qualifiers (Hash)

    a customizable set of options

Returns:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/dwh/adapters/druid.rb', line 125

def (table)
  sql = <<-SQL
  SELECT #{COLUMNS_FOR_TABLE} FROM INFORMATION_SCHEMA.COLUMNS
  WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = '#{table}'
  SQL

  stats = stats(table)
  db_table = Table.new 'table', table_stats: stats
  cols = execute(sql, format: :object)
  st = table_druid_schema_types(table, stats.date_end)

  cols.each do |col|
    db_table << Column.new(
      name: col['COLUMN_NAME'],
      schema_type: st[:metrics].include?(col['COLUMN_NAME']) ? 'measure' : 'dimension',
      data_type: col['DATA_TYPE'],
      precision: col['NUMERIC_PRECISION'],
      scale: col['NUMERIC_SCALE'],
      max_char_length: col['CHARACTER_MAXIMUM_LENGTH']
    )
  end

  db_table
end

#stats(table, date_column: '__time') ⇒ Object

Date column will default to __time. If the datasource, does not have a date column please set it to nil

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: '__time')

    optional date column

See Also:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/dwh/adapters/druid.rb', line 89

def stats(table, date_column: '__time')
  sql = <<-SQL
  SELECT
  count(*) ROW_COUNT
  #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"}
  #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"}
  FROM "#{table}"
  SQL

  result = execute(sql)

  TableStats.new(
    row_count: result[0][0],
    date_start: result[0][1],
    date_end: result[0][2]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/dwh/adapters/druid.rb', line 208

def stream(sql, &block)
  on_data_calls = 0
  with_debug(sql) do
    connection.post(DRUID_SQL) do |req|
      req.headers['Content-Type'] = 'application/json'
      req.body = { query: sql, resultFormat: 'csv' }.to_json
      req.options.on_data = proc do |chunk, _chunk_size|
        block.call chunk.force_encoding('utf-8')
        on_data_calls += 1
      end
    end
  end

  on_data_calls
end

#tablesArray<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Returns:

  • (Array<String>)


77
78
79
80
81
82
# File 'lib/dwh/adapters/druid.rb', line 77

def tables
  resp = connection.get(DRUID_DATASOURCES) do |req|
    req.options.timeout = 30
  end
  JSON.parse resp.body
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/dwh/adapters/druid.rb', line 61

def test_connection(raise_exception: false)
  res = connection.get(DRUID_STATUS)
  unless res.success?
    raise ConnectionError, res.body if raise_exception

    false
  end

  true
rescue Faraday::ConnectionFailed => e
  raise ConnectionError, e.message if raise_exception

  false
end