Class: PostgresToRedshift
- Inherits:
-
Object
- Object
- PostgresToRedshift
- Defined in:
- lib/postgres_to_redshift/table.rb,
lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb
Overview
table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |
Defined Under Namespace
Constant Summary collapse
Class Attribute Summary collapse
-
.source_uri ⇒ Object
Returns the value of attribute source_uri.
-
.target_uri ⇒ Object
Returns the value of attribute target_uri.
Instance Attribute Summary collapse
-
#s3 ⇒ Object
readonly
Returns the value of attribute s3.
-
#source_connection ⇒ Object
readonly
Returns the value of attribute source_connection.
-
#target_connection ⇒ Object
readonly
Returns the value of attribute target_connection.
Class Method Summary collapse
Instance Method Summary collapse
- #bucket ⇒ Object
- #column_definitions(table) ⇒ Object
- #copy_table(table) ⇒ Object
- #import_table(table) ⇒ Object
- #tables ⇒ Object
- #upload_table(table, buffer, chunk) ⇒ Object
Class Attribute Details
.source_uri ⇒ Object
Returns the value of attribute source_uri.
12 13 14 |
# File 'lib/postgres_to_redshift.rb', line 12 def source_uri @source_uri end |
.target_uri ⇒ Object
Returns the value of attribute target_uri.
12 13 14 |
# File 'lib/postgres_to_redshift.rb', line 12 def target_uri @target_uri end |
Instance Attribute Details
#s3 ⇒ Object (readonly)
Returns the value of attribute s3.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def s3 @s3 end |
#source_connection ⇒ Object (readonly)
Returns the value of attribute source_connection.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def source_connection @source_connection end |
#target_connection ⇒ Object (readonly)
Returns the value of attribute target_connection.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def target_connection @target_connection end |
Class Method Details
.source_connection ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/postgres_to_redshift.rb', line 41 def self.source_connection unless instance_variable_defined?(:"@source_connection") @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1]) @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") end @source_connection end |
.target_connection ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/postgres_to_redshift.rb', line 50 def self.target_connection unless instance_variable_defined?(:"@target_connection") @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1]) end @target_connection end |
.update_tables ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/postgres_to_redshift.rb', line 21 def self.update_tables update_tables = PostgresToRedshift.new update_tables.tables.each do |table| target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") update_tables.copy_table(table) update_tables.import_table(table) end end |
Instance Method Details
#bucket ⇒ Object
83 84 85 |
# File 'lib/postgres_to_redshift.rb', line 83 def bucket @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']] end |
#column_definitions(table) ⇒ Object
75 76 77 |
# File 'lib/postgres_to_redshift.rb', line 75 def column_definitions(table) source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position") end |
#copy_table(table) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/postgres_to_redshift.rb', line 87 def copy_table(table) tmpfile = Tempfile.new("psql2rs") zip = Zlib::GzipWriter.new(tmpfile) chunksize = 5 * GIGABYTE # uncompressed chunk = 1 bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all begin puts "Downloading #{table}" copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'" source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data zip.write(row) if (zip.pos > chunksize) zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) chunk += 1 zip.close unless zip.closed? tmpfile.unlink tmpfile = Tempfile.new("psql2rs") zip = Zlib::GzipWriter.new(tmpfile) end end end zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) ensure zip.close unless zip.closed? tmpfile.unlink end end |
#import_table(table) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/postgres_to_redshift.rb', line 126 def import_table(table) puts "Importing #{table.target_table_name}" target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating") target_connection.exec("BEGIN;") target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") target_connection.exec("COMMIT;") end |
#tables ⇒ Object
66 67 68 69 70 71 72 73 |
# File 'lib/postgres_to_redshift.rb', line 66 def tables source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes| table = Table.new(attributes: table_attributes) next if table.name =~ /^pg_/ table.columns = column_definitions(table) table end.compact end |
#upload_table(table, buffer, chunk) ⇒ Object
121 122 123 124 |
# File 'lib/postgres_to_redshift.rb', line 121 def upload_table(table, buffer, chunk) puts "Uploading #{table.target_table_name}.#{chunk}" bucket.objects["export/#{table.target_table_name}.psv.gz.#{chunk}"].write(buffer, acl: :authenticated_read) end |