Class: DataDuck::RedshiftDestination

Inherits:
Destination show all
Defined in:
lib/dataduck/redshift_destination.rb

Instance Attribute Summary collapse

Attributes inherited from Database

#name

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Destination

destination, destination_config, load_config!, only_destination

Constructor Details

#initialize(name, config) ⇒ RedshiftDestination

Returns a new instance of RedshiftDestination.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/dataduck/redshift_destination.rb', line 16

def initialize(name, config)
  load_value('aws_key', name, config)
  load_value('aws_secret', name, config)
  load_value('s3_bucket', name, config)
  load_value('s3_region', name, config)
  load_value('host', name, config)
  load_value('port', name, config)
  load_value('database', name, config)
  load_value('schema', name, config)
  load_value('username', name, config)
  load_value('password', name, config)

  @redshift_connection = nil

  super
end

Instance Attribute Details

#aws_keyObject

Returns the value of attribute aws_key.



5
6
7
# File 'lib/dataduck/redshift_destination.rb', line 5

def aws_key
  @aws_key
end

#aws_secretObject

Returns the value of attribute aws_secret.



6
7
8
# File 'lib/dataduck/redshift_destination.rb', line 6

def aws_secret
  @aws_secret
end

#databaseObject

Returns the value of attribute database.



11
12
13
# File 'lib/dataduck/redshift_destination.rb', line 11

def database
  @database
end

#hostObject

Returns the value of attribute host.



9
10
11
# File 'lib/dataduck/redshift_destination.rb', line 9

def host
  @host
end

#passwordObject

Returns the value of attribute password.



14
15
16
# File 'lib/dataduck/redshift_destination.rb', line 14

def password
  @password
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/dataduck/redshift_destination.rb', line 10

def port
  @port
end

#s3_bucketObject

Returns the value of attribute s3_bucket.



7
8
9
# File 'lib/dataduck/redshift_destination.rb', line 7

def s3_bucket
  @s3_bucket
end

#s3_regionObject

Returns the value of attribute s3_region.



8
9
10
# File 'lib/dataduck/redshift_destination.rb', line 8

def s3_region
  @s3_region
end

#schemaObject

Returns the value of attribute schema.



12
13
14
# File 'lib/dataduck/redshift_destination.rb', line 12

def schema
  @schema
end

#usernameObject

Returns the value of attribute username.



13
14
15
# File 'lib/dataduck/redshift_destination.rb', line 13

def username
  @username
end

Class Method Details

.value_to_string(value) ⇒ Object



238
239
240
241
242
243
244
245
# File 'lib/dataduck/redshift_destination.rb', line 238

def self.value_to_string(value)
  string_value = ''
  if value.respond_to? :to_s
    string_value = value.to_s
  end
  string_value.gsub!('"', '""')
  return string_value
end

Instance Method Details

#connectionObject



33
34
35
36
37
38
39
# File 'lib/dataduck/redshift_destination.rb', line 33

def connection
  @redshift_connection ||= Sequel.connect("redshift://#{ self.username }:#{ self.password }@#{ self.host }:#{ self.port }/#{ self.database }" +
          "?force_standard_strings=f",
      :client_min_messages => '',
      :force_standard_strings => false
  )
end

#copy_query(table, s3_path) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/dataduck/redshift_destination.rb', line 41

def copy_query(table, s3_path)
  properties_joined_string = "\"#{ table.output_column_names.join('","') }\""
  query_fragments = []
  query_fragments << "COPY #{ table.staging_name } (#{ properties_joined_string })"
  query_fragments << "FROM '#{ s3_path }'"
  query_fragments << "CREDENTIALS 'aws_access_key_id=#{ self.aws_key };aws_secret_access_key=#{ self.aws_secret }'"
  query_fragments << "REGION '#{ self.s3_region }'"
  query_fragments << "CSV TRUNCATECOLUMNS ACCEPTINVCHARS EMPTYASNULL"
  query_fragments << "DATEFORMAT 'auto'"
  return query_fragments.join(" ")
end

#create_columns_on_data_warehouse!(table) ⇒ Object



53
54
55
56
57
58
59
60
61
62
# File 'lib/dataduck/redshift_destination.rb', line 53

def create_columns_on_data_warehouse!(table)
  columns = get_columns_in_data_warehouse(table.building_name)
  column_names = columns.map { |col| col[:name].to_s }
  table.output_schema.map do |name, data_type|
    if !column_names.include?(name.to_s)
      redshift_data_type = self.type_to_redshift_type(data_type)
      self.query("ALTER TABLE #{ table.building_name } ADD #{ name } #{ redshift_data_type }")
    end
  end
end

#create_output_table_with_name!(table, name) ⇒ Object



88
89
90
# File 'lib/dataduck/redshift_destination.rb', line 88

def create_output_table_with_name!(table, name)
  self.query(self.create_table_query(table, name))
end

#create_output_tables!(table) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/dataduck/redshift_destination.rb', line 78

def create_output_tables!(table)
  self.create_output_table_with_name!(table, table.building_name)
  self.create_columns_on_data_warehouse!(table)

  if table.building_name != table.staging_name
    self.drop_staging_table!(table)
    self.create_output_table_with_name!(table, table.staging_name)
  end
end

#create_table_query(table, table_name = nil) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/dataduck/redshift_destination.rb', line 64

def create_table_query(table, table_name = nil)
  table_name ||= table.name
  props_array = table.output_schema.map do |name, data_type|
    redshift_data_type = self.type_to_redshift_type(data_type)
    "\"#{ name }\" #{ redshift_data_type }"
  end
  props_string = props_array.join(', ')

  distribution_clause = table.distribution_key ? "DISTKEY(#{ table.distribution_key })" : ""
  index_clause = table.indexes.length > 0 ? "INTERLEAVED SORTKEY (#{ table.indexes.join(',') })" : ""

  "CREATE TABLE IF NOT EXISTS #{ table_name } (#{ props_string }) #{ distribution_clause } #{ index_clause }"
end

#data_as_csv_string(data, property_names) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dataduck/redshift_destination.rb', line 92

def data_as_csv_string(data, property_names)
  data_string_components = [] # for performance reasons, join strings this way
  data.each do |result|
    property_names.each_with_index do |property_name, index|
      value = result[property_name.to_sym]

      if index == 0
        data_string_components << '"'
      end

      data_string_components << DataDuck::RedshiftDestination.value_to_string(value)

      if index == property_names.length - 1
        data_string_components << '"'
      else
        data_string_components << '","'
      end
    end
    data_string_components << "\n"
  end

  return data_string_components.join
end

#dbconsole(options = {}) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/dataduck/redshift_destination.rb', line 130

def dbconsole(options = {})
  args = []
  args << "--host=#{ self.host }"
  args << "--username=#{ self.username }"
  args << "--dbname=#{ self.database }"
  args << "--port=#{ self.port }"

  ENV['PGPASSWORD'] = self.password

  self.find_command_and_execute("psql", *args)
end

#drop_staging_table!(table) ⇒ Object



142
143
144
# File 'lib/dataduck/redshift_destination.rb', line 142

def drop_staging_table!(table)
  self.query("DROP TABLE IF EXISTS #{ table.staging_name }")
end

#finish_fully_reloading_table!(table) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/dataduck/redshift_destination.rb', line 198

def finish_fully_reloading_table!(table)
  self.query("DROP TABLE IF EXISTS zz_dataduck_old_#{ table.name }")

  table_already_exists = self.table_names.include?(table.name)
  if table_already_exists
    self.query("ALTER TABLE #{ table.name } RENAME TO zz_dataduck_old_#{ table.name }")
  end

  self.query("ALTER TABLE #{ table.staging_name } RENAME TO #{ table.name }")
  self.query("DROP TABLE IF EXISTS zz_dataduck_old_#{ table.name }")
end

#get_columns_in_data_warehouse(table_name) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/dataduck/redshift_destination.rb', line 146

def get_columns_in_data_warehouse(table_name)
  cols_query = "SELECT pg_table_def.column AS name, type AS data_type, distkey, sortkey FROM pg_table_def WHERE tablename='#{ table_name }'"
  results = self.query(cols_query)

  columns = []
  results.each do |result|
    columns << {
        name: result[:name],
        data_type: result[:data_type],
        distkey: result[:distkey],
        sortkey: result[:sortkey],
    }
  end

  return columns
end

#load_table!(table) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/dataduck/redshift_destination.rb', line 210

def load_table!(table)
  DataDuck::Logs.info "Loading table #{ table.name }..."
  s3_object = self.upload_table_to_s3!(table)
  self.create_output_tables!(table)
  self.query(self.copy_query(table, s3_object.s3_path))
  s3_object.delete!

  if table.staging_name != table.building_name
    self.merge_from_staging!(table)
    self.drop_staging_table!(table)
  end
end

#merge_from_staging!(table) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/dataduck/redshift_destination.rb', line 163

def merge_from_staging!(table)
  if table.staging_name == table.building_name
    return
  end

  # Following guidelines in http://docs.aws.amazon.com/redshift/latest/dg/merge-examples.html
  staging_name = table.staging_name
  building_name = table.building_name
  delete_query = "DELETE FROM #{ building_name } USING #{ staging_name } WHERE #{ building_name }.id = #{ staging_name }.id" # TODO allow custom or multiple keys
  self.query(delete_query)
  insert_query = "INSERT INTO #{ building_name } (\"#{ table.output_column_names.join('","') }\") SELECT \"#{ table.output_column_names.join('","') }\" FROM #{ staging_name }"
  self.query(insert_query)
end

#query(sql) ⇒ Object



177
178
179
180
# File 'lib/dataduck/redshift_destination.rb', line 177

def query(sql)
  Logs.debug("SQL executing on #{ self.name }:\n  " + sql)
  self.connection[sql].map { |elem| elem }
end

#recreate_table!(table) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/dataduck/redshift_destination.rb', line 223

def recreate_table!(table)
  DataDuck::Logs.info "Recreating table #{ table.name }..."

  if !self.table_names.include?(table.name)
    raise Exception.new("Table #{ table.name } doesn't exist on the Redshift database, so it can't be recreated. Did you want to use `dataduck create #{ table.name }` instead?")
  end

  recreating_temp_name = "zz_dataduck_recreating_#{ table.name }"
  self.create_output_table_with_name!(table, recreating_temp_name)
  self.query("INSERT INTO #{ recreating_temp_name } (\"#{ table.output_column_names.join('","') }\") SELECT \"#{ table.output_column_names.join('","') }\" FROM #{ table.name }")
  self.query("ALTER TABLE #{ table.name } RENAME TO zz_dataduck_recreating_old_#{ table.name }")
  self.query("ALTER TABLE #{ recreating_temp_name } RENAME TO #{ table.name }")
  self.query("DROP TABLE zz_dataduck_recreating_old_#{ table.name }")
end

#table_namesObject



182
183
184
# File 'lib/dataduck/redshift_destination.rb', line 182

def table_names
  self.query("SELECT DISTINCT(tablename) AS name FROM pg_table_def WHERE schemaname='public' ORDER BY name").map { |item| item[:name] }
end

#type_to_redshift_type(which_type) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/dataduck/redshift_destination.rb', line 116

def type_to_redshift_type(which_type)
  which_type = which_type.to_s

  if ["string", "text", "bigtext"].include?(which_type)
    {
        "string" => "varchar(255)",
        "text" => "varchar(8191)",
        "bigtext" => "varchar(65535)", # Redshift maximum
    }[which_type]
  else
    which_type
  end
end

#upload_table_to_s3!(table) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
# File 'lib/dataduck/redshift_destination.rb', line 186

def upload_table_to_s3!(table)
  now_epoch = Time.now.to_i.to_s
  filepath = "pending/#{ table.name.downcase }_#{ now_epoch }.csv"

  table_csv = self.data_as_csv_string(table.data, table.output_column_names)

  s3_obj = S3Object.new(filepath, table_csv, self.aws_key, self.aws_secret,
      self.s3_bucket, self.s3_region)
  s3_obj.upload!
  return s3_obj
end