Class: PostgresToRedshift
- Inherits:
-
Object
- Object
- PostgresToRedshift
- Defined in:
- lib/postgres_to_redshift/table.rb,
lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb
Overview
table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |
Defined Under Namespace
Constant Summary collapse
- VERSION =
"0.1.1"
Class Attribute Summary collapse
-
.source_uri ⇒ Object
Returns the value of attribute source_uri.
-
.target_uri ⇒ Object
Returns the value of attribute target_uri.
Instance Attribute Summary collapse
-
#s3 ⇒ Object
readonly
Returns the value of attribute s3.
-
#source_connection ⇒ Object
readonly
Returns the value of attribute source_connection.
-
#target_connection ⇒ Object
readonly
Returns the value of attribute target_connection.
Class Method Summary collapse
Instance Method Summary collapse
- #bucket ⇒ Object
- #column_definitions(table) ⇒ Object
- #copy_table(table) ⇒ Object
- #import_table(table) ⇒ Object
- #tables ⇒ Object
- #upload_table(table, buffer) ⇒ Object
Class Attribute Details
.source_uri ⇒ Object
Returns the value of attribute source_uri.
12 13 14 |
# File 'lib/postgres_to_redshift.rb', line 12 def source_uri @source_uri end |
.target_uri ⇒ Object
Returns the value of attribute target_uri.
12 13 14 |
# File 'lib/postgres_to_redshift.rb', line 12 def target_uri @target_uri end |
Instance Attribute Details
#s3 ⇒ Object (readonly)
Returns the value of attribute s3.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def s3 @s3 end |
#source_connection ⇒ Object (readonly)
Returns the value of attribute source_connection.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def source_connection @source_connection end |
#target_connection ⇒ Object (readonly)
Returns the value of attribute target_connection.
15 16 17 |
# File 'lib/postgres_to_redshift.rb', line 15 def target_connection @target_connection end |
Class Method Details
.source_connection ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/postgres_to_redshift.rb', line 37 def self.source_connection unless instance_variable_defined?(:"@source_connection") @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1]) @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") end @source_connection end |
.target_connection ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/postgres_to_redshift.rb', line 46 def self.target_connection unless instance_variable_defined?(:"@target_connection") @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1]) end @target_connection end |
.update_tables ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/postgres_to_redshift.rb', line 17 def self.update_tables update_tables = PostgresToRedshift.new update_tables.tables.each do |table| target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{table.target_table_name} (#{table.columns_for_create})") update_tables.copy_table(table) update_tables.import_table(table) end end |
Instance Method Details
#bucket ⇒ Object
79 80 81 |
# File 'lib/postgres_to_redshift.rb', line 79 def bucket @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']] end |
#column_definitions(table) ⇒ Object
71 72 73 |
# File 'lib/postgres_to_redshift.rb', line 71 def column_definitions(table) source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position") end |
#copy_table(table) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/postgres_to_redshift.rb', line 83 def copy_table(table) buffer = StringIO.new zip = Zlib::GzipWriter.new(buffer) puts "Downloading #{table}" copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'" source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data zip.write(row) end end zip.finish buffer.rewind upload_table(table, buffer) end |
#import_table(table) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/postgres_to_redshift.rb', line 106 def import_table(table) puts "Importing #{table.target_table_name}" target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating") target_connection.exec("BEGIN;") target_connection.exec("ALTER TABLE public.#{table.target_table_name} RENAME TO #{table.target_table_name}_updating") target_connection.exec("CREATE TABLE public.#{table.target_table_name} (#{table.columns_for_create})") target_connection.exec("COPY public.#{table.target_table_name} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") target_connection.exec("COMMIT;") end |
#tables ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/postgres_to_redshift.rb', line 62 def tables source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes| table = Table.new(attributes: table_attributes) next if table.name =~ /^pg_/ table.columns = column_definitions(table) table end.compact end |
#upload_table(table, buffer) ⇒ Object
100 101 102 103 104 |
# File 'lib/postgres_to_redshift.rb', line 100 def upload_table(table, buffer) puts "Uploading #{table.target_table_name}" bucket.objects["export/#{table.target_table_name}.psv.gz"].delete bucket.objects["export/#{table.target_table_name}.psv.gz"].write(buffer, acl: :authenticated_read) end |