Class: DataDuck::RedshiftDestination
- Inherits:
-
Destination
- Object
- Database
- Destination
- DataDuck::RedshiftDestination
- Defined in:
- lib/dataduck/redshift_destination.rb
Instance Attribute Summary collapse
-
#aws_key ⇒ Object
Returns the value of attribute aws_key.
-
#aws_secret ⇒ Object
Returns the value of attribute aws_secret.
-
#database ⇒ Object
Returns the value of attribute database.
-
#host ⇒ Object
Returns the value of attribute host.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#s3_bucket ⇒ Object
Returns the value of attribute s3_bucket.
-
#s3_region ⇒ Object
Returns the value of attribute s3_region.
-
#schema ⇒ Object
Returns the value of attribute schema.
-
#username ⇒ Object
Returns the value of attribute username.
Attributes inherited from Database
Class Method Summary collapse
Instance Method Summary collapse
- #connection ⇒ Object
- #copy_query(table, s3_path) ⇒ Object
- #create_columns_on_data_warehouse!(table) ⇒ Object
- #create_output_table_with_name!(table, name) ⇒ Object
- #create_output_tables!(table) ⇒ Object
- #create_table_query(table, table_name = nil) ⇒ Object
- #data_as_csv_string(data, property_names) ⇒ Object
- #dbconsole(options = {}) ⇒ Object
- #drop_staging_table!(table) ⇒ Object
- #finish_fully_reloading_table!(table) ⇒ Object
- #get_columns_in_data_warehouse(table_name) ⇒ Object
-
#initialize(name, config) ⇒ RedshiftDestination
constructor
A new instance of RedshiftDestination.
- #load_table!(table) ⇒ Object
- #merge_from_staging!(table) ⇒ Object
- #query(sql) ⇒ Object
- #recreate_table!(table) ⇒ Object
- #table_names ⇒ Object
- #type_to_redshift_type(which_type) ⇒ Object
- #upload_table_to_s3!(table) ⇒ Object
Methods inherited from Destination
destination, destination_config, load_config!, only_destination
Constructor Details
#initialize(name, config) ⇒ RedshiftDestination
Returns a new instance of RedshiftDestination.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/dataduck/redshift_destination.rb', line 16 def initialize(name, config) load_value('aws_key', name, config) load_value('aws_secret', name, config) load_value('s3_bucket', name, config) load_value('s3_region', name, config) load_value('host', name, config) load_value('port', name, config) load_value('database', name, config) load_value('schema', name, config) load_value('username', name, config) load_value('password', name, config) @redshift_connection = nil super end |
Instance Attribute Details
#aws_key ⇒ Object
Returns the value of attribute aws_key.
5 6 7 |
# File 'lib/dataduck/redshift_destination.rb', line 5 def aws_key @aws_key end |
#aws_secret ⇒ Object
Returns the value of attribute aws_secret.
6 7 8 |
# File 'lib/dataduck/redshift_destination.rb', line 6 def aws_secret @aws_secret end |
#database ⇒ Object
Returns the value of attribute database.
11 12 13 |
# File 'lib/dataduck/redshift_destination.rb', line 11 def database @database end |
#host ⇒ Object
Returns the value of attribute host.
9 10 11 |
# File 'lib/dataduck/redshift_destination.rb', line 9 def host @host end |
#password ⇒ Object
Returns the value of attribute password.
14 15 16 |
# File 'lib/dataduck/redshift_destination.rb', line 14 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
10 11 12 |
# File 'lib/dataduck/redshift_destination.rb', line 10 def port @port end |
#s3_bucket ⇒ Object
Returns the value of attribute s3_bucket.
7 8 9 |
# File 'lib/dataduck/redshift_destination.rb', line 7 def s3_bucket @s3_bucket end |
#s3_region ⇒ Object
Returns the value of attribute s3_region.
8 9 10 |
# File 'lib/dataduck/redshift_destination.rb', line 8 def s3_region @s3_region end |
#schema ⇒ Object
Returns the value of attribute schema.
12 13 14 |
# File 'lib/dataduck/redshift_destination.rb', line 12 def schema @schema end |
#username ⇒ Object
Returns the value of attribute username.
13 14 15 |
# File 'lib/dataduck/redshift_destination.rb', line 13 def username @username end |
Class Method Details
.value_to_string(value) ⇒ Object
238 239 240 241 242 243 244 245 |
# File 'lib/dataduck/redshift_destination.rb', line 238 def self.value_to_string(value) string_value = '' if value.respond_to? :to_s string_value = value.to_s end string_value.gsub!('"', '""') return string_value end |
Instance Method Details
#connection ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/dataduck/redshift_destination.rb', line 33 def connection @redshift_connection ||= Sequel.connect("redshift://#{ self.username }:#{ self.password }@#{ self.host }:#{ self.port }/#{ self.database }" + "?force_standard_strings=f", :client_min_messages => '', :force_standard_strings => false ) end |
#copy_query(table, s3_path) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/dataduck/redshift_destination.rb', line 41 def copy_query(table, s3_path) properties_joined_string = "\"#{ table.output_column_names.join('","') }\"" query_fragments = [] query_fragments << "COPY #{ table.staging_name } (#{ properties_joined_string })" query_fragments << "FROM '#{ s3_path }'" query_fragments << "CREDENTIALS 'aws_access_key_id=#{ self.aws_key };aws_secret_access_key=#{ self.aws_secret }'" query_fragments << "REGION '#{ self.s3_region }'" query_fragments << "CSV TRUNCATECOLUMNS ACCEPTINVCHARS EMPTYASNULL" query_fragments << "DATEFORMAT 'auto'" return query_fragments.join(" ") end |
#create_columns_on_data_warehouse!(table) ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/dataduck/redshift_destination.rb', line 53 def create_columns_on_data_warehouse!(table) columns = get_columns_in_data_warehouse(table.building_name) column_names = columns.map { |col| col[:name].to_s } table.output_schema.map do |name, data_type| if !column_names.include?(name.to_s) redshift_data_type = self.type_to_redshift_type(data_type) self.query("ALTER TABLE #{ table.building_name } ADD #{ name } #{ redshift_data_type }") end end end |
#create_output_table_with_name!(table, name) ⇒ Object
88 89 90 |
# File 'lib/dataduck/redshift_destination.rb', line 88 def create_output_table_with_name!(table, name) self.query(self.create_table_query(table, name)) end |
#create_output_tables!(table) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/dataduck/redshift_destination.rb', line 78 def create_output_tables!(table) self.create_output_table_with_name!(table, table.building_name) self.create_columns_on_data_warehouse!(table) if table.building_name != table.staging_name self.drop_staging_table!(table) self.create_output_table_with_name!(table, table.staging_name) end end |
#create_table_query(table, table_name = nil) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/dataduck/redshift_destination.rb', line 64 def create_table_query(table, table_name = nil) table_name ||= table.name props_array = table.output_schema.map do |name, data_type| redshift_data_type = self.type_to_redshift_type(data_type) "\"#{ name }\" #{ redshift_data_type }" end props_string = props_array.join(', ') distribution_clause = table.distribution_key ? "DISTKEY(#{ table.distribution_key })" : "" index_clause = table.indexes.length > 0 ? "INTERLEAVED SORTKEY (#{ table.indexes.join(',') })" : "" "CREATE TABLE IF NOT EXISTS #{ table_name } (#{ props_string }) #{ distribution_clause } #{ index_clause }" end |
#data_as_csv_string(data, property_names) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/dataduck/redshift_destination.rb', line 92 def data_as_csv_string(data, property_names) data_string_components = [] # for performance reasons, join strings this way data.each do |result| property_names.each_with_index do |property_name, index| value = result[property_name.to_sym] if index == 0 data_string_components << '"' end data_string_components << DataDuck::RedshiftDestination.value_to_string(value) if index == property_names.length - 1 data_string_components << '"' else data_string_components << '","' end end data_string_components << "\n" end return data_string_components.join end |
#dbconsole(options = {}) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/dataduck/redshift_destination.rb', line 130 def dbconsole( = {}) args = [] args << "--host=#{ self.host }" args << "--username=#{ self.username }" args << "--dbname=#{ self.database }" args << "--port=#{ self.port }" ENV['PGPASSWORD'] = self.password self.find_command_and_execute("psql", *args) end |
#drop_staging_table!(table) ⇒ Object
142 143 144 |
# File 'lib/dataduck/redshift_destination.rb', line 142 def drop_staging_table!(table) self.query("DROP TABLE IF EXISTS #{ table.staging_name }") end |
#finish_fully_reloading_table!(table) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/dataduck/redshift_destination.rb', line 198 def finish_fully_reloading_table!(table) self.query("DROP TABLE IF EXISTS zz_dataduck_old_#{ table.name }") table_already_exists = self.table_names.include?(table.name) if table_already_exists self.query("ALTER TABLE #{ table.name } RENAME TO zz_dataduck_old_#{ table.name }") end self.query("ALTER TABLE #{ table.staging_name } RENAME TO #{ table.name }") self.query("DROP TABLE IF EXISTS zz_dataduck_old_#{ table.name }") end |
#get_columns_in_data_warehouse(table_name) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/dataduck/redshift_destination.rb', line 146 def get_columns_in_data_warehouse(table_name) cols_query = "SELECT pg_table_def.column AS name, type AS data_type, distkey, sortkey FROM pg_table_def WHERE tablename='#{ table_name }'" results = self.query(cols_query) columns = [] results.each do |result| columns << { name: result[:name], data_type: result[:data_type], distkey: result[:distkey], sortkey: result[:sortkey], } end return columns end |
#load_table!(table) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/dataduck/redshift_destination.rb', line 210 def load_table!(table) DataDuck::Logs.info "Loading table #{ table.name }..." s3_object = self.upload_table_to_s3!(table) self.create_output_tables!(table) self.query(self.copy_query(table, s3_object.s3_path)) s3_object.delete! if table.staging_name != table.building_name self.merge_from_staging!(table) self.drop_staging_table!(table) end end |
#merge_from_staging!(table) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/dataduck/redshift_destination.rb', line 163 def merge_from_staging!(table) if table.staging_name == table.building_name return end # Following guidelines in http://docs.aws.amazon.com/redshift/latest/dg/merge-examples.html staging_name = table.staging_name building_name = table.building_name delete_query = "DELETE FROM #{ building_name } USING #{ staging_name } WHERE #{ building_name }.id = #{ staging_name }.id" # TODO allow custom or multiple keys self.query(delete_query) insert_query = "INSERT INTO #{ building_name } (\"#{ table.output_column_names.join('","') }\") SELECT \"#{ table.output_column_names.join('","') }\" FROM #{ staging_name }" self.query(insert_query) end |
#query(sql) ⇒ Object
177 178 179 180 |
# File 'lib/dataduck/redshift_destination.rb', line 177 def query(sql) Logs.debug("SQL executing on #{ self.name }:\n " + sql) self.connection[sql].map { |elem| elem } end |
#recreate_table!(table) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/dataduck/redshift_destination.rb', line 223 def recreate_table!(table) DataDuck::Logs.info "Recreating table #{ table.name }..." if !self.table_names.include?(table.name) raise Exception.new("Table #{ table.name } doesn't exist on the Redshift database, so it can't be recreated. Did you want to use `dataduck create #{ table.name }` instead?") end recreating_temp_name = "zz_dataduck_recreating_#{ table.name }" self.create_output_table_with_name!(table, recreating_temp_name) self.query("INSERT INTO #{ recreating_temp_name } (\"#{ table.output_column_names.join('","') }\") SELECT \"#{ table.output_column_names.join('","') }\" FROM #{ table.name }") self.query("ALTER TABLE #{ table.name } RENAME TO zz_dataduck_recreating_old_#{ table.name }") self.query("ALTER TABLE #{ recreating_temp_name } RENAME TO #{ table.name }") self.query("DROP TABLE zz_dataduck_recreating_old_#{ table.name }") end |
#table_names ⇒ Object
182 183 184 |
# File 'lib/dataduck/redshift_destination.rb', line 182 def table_names self.query("SELECT DISTINCT(tablename) AS name FROM pg_table_def WHERE schemaname='public' ORDER BY name").map { |item| item[:name] } end |
#type_to_redshift_type(which_type) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/dataduck/redshift_destination.rb', line 116 def type_to_redshift_type(which_type) which_type = which_type.to_s if ["string", "text", "bigtext"].include?(which_type) { "string" => "varchar(255)", "text" => "varchar(8191)", "bigtext" => "varchar(65535)", # Redshift maximum }[which_type] else which_type end end |
#upload_table_to_s3!(table) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/dataduck/redshift_destination.rb', line 186 def upload_table_to_s3!(table) now_epoch = Time.now.to_i.to_s filepath = "pending/#{ table.name.downcase }_#{ now_epoch }.csv" table_csv = self.data_as_csv_string(table.data, table.output_column_names) s3_obj = S3Object.new(filepath, table_csv, self.aws_key, self.aws_secret, self.s3_bucket, self.s3_region) s3_obj.upload! return s3_obj end |