Class: PostgresToRedshift
- Inherits:
-
Object
- Object
- PostgresToRedshift
- Defined in:
- lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb
Constant Summary collapse
- VERSION =
"0.0.1"
Instance Attribute Summary collapse
-
#s3 ⇒ Object
readonly
Returns the value of attribute s3.
-
#source_connection ⇒ Object
readonly
Returns the value of attribute source_connection.
-
#target_connection ⇒ Object
readonly
Returns the value of attribute target_connection.
Class Method Summary collapse
Instance Method Summary collapse
- #bucket ⇒ Object
- #copy_table(source_table, target_table, is_view = false) ⇒ Object
- #copy_tables ⇒ Object
- #copy_views ⇒ Object
- #create_new_tables ⇒ Object
- #import_table(target_table) ⇒ Object
-
#import_tables ⇒ Object
FIXME: This relies on views being uploaded after tables.
-
#initialize(source_uri:) ⇒ PostgresToRedshift
constructor
A new instance of PostgresToRedshift.
- #table_columns(table_name) ⇒ Object
- #tables ⇒ Object
- #upload_table(target_table, buffer) ⇒ Object
- #views ⇒ Object
Constructor Details
#initialize(source_uri:) ⇒ PostgresToRedshift
Returns a new instance of PostgresToRedshift.
19 20 21 22 23 24 25 |
# File 'lib/postgres_to_redshift.rb', line 19 def initialize(source_uri:) source_uri = URI.parse(source_uri) target_uri = URI.parse(ENV['REDSHIFT_URI']) @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user, password: source_uri.password, dbname: source_uri.path[1..-1]) @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user, password: target_uri.password, dbname: target_uri.path[1..-1]) end |
Instance Attribute Details
#s3 ⇒ Object (readonly)
Returns the value of attribute s3.
7 8 9 |
# File 'lib/postgres_to_redshift.rb', line 7 def s3 @s3 end |
#source_connection ⇒ Object (readonly)
Returns the value of attribute source_connection.
7 8 9 |
# File 'lib/postgres_to_redshift.rb', line 7 def source_connection @source_connection end |
#target_connection ⇒ Object (readonly)
Returns the value of attribute target_connection.
7 8 9 |
# File 'lib/postgres_to_redshift.rb', line 7 def target_connection @target_connection end |
Class Method Details
.update_tables ⇒ Object
9 10 11 12 13 14 15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 9 def self.update_tables update_tables = PostgresToRedshift.new(source_uri: ARGV[0]) update_tables.create_new_tables # FIXME: BIG WARNING HERE: this order is important. We want the views to overwrite the tables. We should make it so the order doesn't matter later. update_tables.copy_tables update_tables.copy_views update_tables.import_tables end |
Instance Method Details
#bucket ⇒ Object
55 56 57 |
# File 'lib/postgres_to_redshift.rb', line 55 def bucket @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']] end |
#copy_table(source_table, target_table, is_view = false) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/postgres_to_redshift.rb', line 65 def copy_table(source_table, target_table, is_view = false) buffer = "" puts "Downloading #{source_table}" copy_command = if is_view "COPY (SELECT * FROM #{source_table}) TO STDOUT WITH DELIMITER '|'" else "COPY #{source_table} TO STDOUT WITH DELIMITER '|'" end source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data buffer << row end end upload_table(target_table, buffer) end |
#copy_tables ⇒ Object
104 105 106 107 108 |
# File 'lib/postgres_to_redshift.rb', line 104 def copy_tables tables.each do |table| copy_table(table, table) end end |
#copy_views ⇒ Object
110 111 112 113 114 115 |
# File 'lib/postgres_to_redshift.rb', line 110 def copy_views views.each do |view| table = view.gsub(/_view/, '') copy_table(view, table, true) end end |
#create_new_tables ⇒ Object
59 60 61 62 63 |
# File 'lib/postgres_to_redshift.rb', line 59 def create_new_tables tables.each do |table| target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{table} (#{table_columns(table)})") end end |
#import_table(target_table) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/postgres_to_redshift.rb', line 89 def import_table(target_table) puts "Importing #{target_table}" target_connection.exec("DROP TABLE IF EXISTS public.#{target_table}_updating") target_connection.exec("BEGIN;") target_connection.exec("ALTER TABLE public.#{target_table} RENAME TO #{target_table}_updating") target_connection.exec("CREATE TABLE public.#{target_table} (#{table_columns(target_table)})") target_connection.exec("COPY public.#{target_table} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{target_table}.psv' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") target_connection.exec("COMMIT;") end |
#import_tables ⇒ Object
FIXME: This relies on views being uploaded after tables.
118 119 120 121 122 |
# File 'lib/postgres_to_redshift.rb', line 118 def import_tables tables.each do |table| import_table(table) end end |
#table_columns(table_name) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/postgres_to_redshift.rb', line 35 def table_columns(table_name) source_connection.exec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table_name}'").map do |row| data_type = row["data_type"] data_type.gsub!(/text/, 'character varying(max)') data_type.gsub!(/json/, 'character varying(max)') data_type.gsub!(/bytea/, 'character varying(max)') data_type.gsub!(/money/, 'character varying(max)') if row["character_maximum_length"].to_s.length > 0 %Q|"#{row["column_name"]}" #{data_type}(#{row["character_maximum_length"]})| else %Q|"#{row["column_name"]}" #{data_type}| end end.join(", ") end |
#tables ⇒ Object
31 32 33 |
# File 'lib/postgres_to_redshift.rb', line 31 def tables source_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE'").map { |row| row["table_name"] } end |
#upload_table(target_table, buffer) ⇒ Object
83 84 85 86 87 |
# File 'lib/postgres_to_redshift.rb', line 83 def upload_table(target_table, buffer) puts "Uploading #{target_table}" bucket.objects["export/#{target_table}.psv"].delete bucket.objects["export/#{target_table}.psv"].write(buffer, acl: :authenticated_read) end |
#views ⇒ Object
27 28 29 |
# File 'lib/postgres_to_redshift.rb', line 27 def views source_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'VIEW'").map { |row| row["table_name"] } - ["pg_stat_statements"] end |