Class: DWH::Adapters::Druid
- Defined in:
- lib/dwh/adapters/druid.rb
Overview
Druid adapter.
Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.
Constant Summary collapse
- DRUID_STATUS =
'/status'.freeze
- DRUID_DATASOURCES =
'/druid/coordinator/v1/datasources'.freeze
- DRUID_SQL =
'/druid/v2/sql/'.freeze
- COLUMNS_FOR_TABLE =
'"COLUMN_NAME","DATA_TYPE", "NUMERIC_PRECISION", "NUMERIC_SCALE", "CHARACTER_MAXIMUM_LENGTH"'.freeze
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#drop_unused_segments(table, interval) ⇒ Object
Marks unused segments of a datasource/table as unused.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#stats(table, date_column: '__time') ⇒ Object
Date column will default to __time.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#tables ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/dwh/adapters/druid.rb', line 39 def connection return @connection if @connection @connection = Faraday.new( url: "#{config[:protocol]}://#{config[:host]}:#{config[:port]}", headers: { 'Content-Type' => 'application/json', **(config[:basic_auth] ? { 'Authorization' => "Basic #{config[:basic_auth]}" } : {}) }, request: { timeout: config[:query_timeout], open_timeout: config[:open_timeout], context: { client_name: config[:client_name] } }.merge(extra_connection_params) ) @connection end |
#drop_unused_segments(table, interval) ⇒ Object
Marks unused segments of a datasource/table as unused
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/dwh/adapters/druid.rb', line 111 def drop_unused_segments(table, interval) url = "/druid/coordinator/v1/datasources/#{table}/markUnused" logger.debug '=== Dropping Segments ===' response = connection.post(url) do |req| req.headers['Content-Type'] = 'application/json' req.body = { interval: interval }.to_json end logger.debug response.status end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/dwh/adapters/druid.rb', line 151 def execute(sql, format: :array, retries: 0) format = format.to_sym result_format = format == :native ? 'array' : format.to_s resp = with_debug(sql) do with_retry(retries) do connection.post(DRUID_SQL) do |req| req.headers['Content-Type'] = 'application/json' req.body = { query: sql, resultFormat: result_format, context: { sqlTimeZone: 'Etc/UTC' } }.merge(extra_query_params) .to_json end end end raise ExecutionError, "Could not execute #{sql}: \n #{resp.body}" if resp.status != 200 if format == :native resp else format == :csv ? resp.body : JSON.parse(resp.body) end end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/dwh/adapters/druid.rb', line 178 def execute_stream(sql, io, stats: nil, retries: 0) resp = with_debug(sql) do with_retry(retries) do connection.post(DRUID_SQL) do |req| req.headers['Content-Type'] = 'application/json' req.body = { query: sql, resultFormat: 'csv', header: true # added timezone here due to druid bug # where date sub query joins failed without it. # context: { sqlTimeZone: 'Etc/UTC'} }.merge(extra_query_params).to_json parseable_row = '' req..on_data = proc do |chunk, _| handle_streaming_chunk(io, chunk, stats, parseable_row) end end end end io.rewind # Raise exception on failed runs raise ExecutionError, io.read unless resp.success? io end |
#metadata(table) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example:
("public.big_table")
("big_table")
("big_table",schema: "public")
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/dwh/adapters/druid.rb', line 125 def (table) sql = <<-SQL SELECT #{COLUMNS_FOR_TABLE} FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = '#{table}' SQL stats = stats(table) db_table = Table.new 'table', table_stats: stats cols = execute(sql, format: :object) st = table_druid_schema_types(table, stats.date_end) cols.each do |col| db_table << Column.new( name: col['COLUMN_NAME'], schema_type: st[:metrics].include?(col['COLUMN_NAME']) ? 'measure' : 'dimension', data_type: col['DATA_TYPE'], precision: col['NUMERIC_PRECISION'], scale: col['NUMERIC_SCALE'], max_char_length: col['CHARACTER_MAXIMUM_LENGTH'] ) end db_table end |
#stats(table, date_column: '__time') ⇒ Object
Date column will default to __time. If the datasource, does not have a date column please set it to nil
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/dwh/adapters/druid.rb', line 89 def stats(table, date_column: '__time') sql = <<-SQL SELECT count(*) ROW_COUNT #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"} #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"} FROM "#{table}" SQL result = execute(sql) TableStats.new( row_count: result[0][0], date_start: result[0][1], date_end: result[0][2] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/dwh/adapters/druid.rb', line 208 def stream(sql, &block) on_data_calls = 0 with_debug(sql) do connection.post(DRUID_SQL) do |req| req.headers['Content-Type'] = 'application/json' req.body = { query: sql, resultFormat: 'csv' }.to_json req..on_data = proc do |chunk, _chunk_size| block.call chunk.force_encoding('utf-8') on_data_calls += 1 end end end on_data_calls end |
#tables ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
77 78 79 80 81 82 |
# File 'lib/dwh/adapters/druid.rb', line 77 def tables resp = connection.get(DRUID_DATASOURCES) do |req| req..timeout = 30 end JSON.parse resp.body end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/dwh/adapters/druid.rb', line 61 def test_connection(raise_exception: false) res = connection.get(DRUID_STATUS) unless res.success? raise ConnectionError, res.body if raise_exception false end true rescue Faraday::ConnectionFailed => e raise ConnectionError, e. if raise_exception false end |