Class: PostgresToRedshift

Inherits:
Object
  • Object
show all
Defined in:
lib/postgres_to_redshift/table.rb,
lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb

Overview

table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |

Defined Under Namespace

Classes: Column, Table

Constant Summary collapse

KILOBYTE =
1024
MEGABYTE =
KILOBYTE * 1024
GIGABYTE =
MEGABYTE * 1024
VERSION =
"0.1.2"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.source_uriObject

Returns the value of attribute source_uri.



12
13
14
# File 'lib/postgres_to_redshift.rb', line 12

def source_uri
  @source_uri
end

.target_uriObject

Returns the value of attribute target_uri.



12
13
14
# File 'lib/postgres_to_redshift.rb', line 12

def target_uri
  @target_uri
end

Instance Attribute Details

#s3Object (readonly)

Returns the value of attribute s3.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def s3
  @s3
end

#source_connectionObject (readonly)

Returns the value of attribute source_connection.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def source_connection
  @source_connection
end

#target_connectionObject (readonly)

Returns the value of attribute target_connection.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def target_connection
  @target_connection
end

Class Method Details

.source_connectionObject



41
42
43
44
45
46
47
48
# File 'lib/postgres_to_redshift.rb', line 41

def self.source_connection
  unless instance_variable_defined?(:"@source_connection")
    @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
    @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
  end

  @source_connection
end

.target_connectionObject



50
51
52
53
54
55
56
# File 'lib/postgres_to_redshift.rb', line 50

def self.target_connection
  unless instance_variable_defined?(:"@target_connection")
    @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1])
  end

  @target_connection
end

.update_tablesObject



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/postgres_to_redshift.rb', line 21

def self.update_tables
  update_tables = PostgresToRedshift.new

  update_tables.tables.each do |table|
    target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

    update_tables.copy_table(table)

    update_tables.import_table(table)
  end
end

Instance Method Details

#bucketObject



83
84
85
# File 'lib/postgres_to_redshift.rb', line 83

def bucket
  @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']]
end

#column_definitions(table) ⇒ Object



75
76
77
# File 'lib/postgres_to_redshift.rb', line 75

def column_definitions(table)
  source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position")
end

#copy_table(table) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/postgres_to_redshift.rb', line 87

def copy_table(table)
  tmpfile = Tempfile.new("psql2rs")
  zip = Zlib::GzipWriter.new(tmpfile)
  chunksize = 5 * GIGABYTE # uncompressed
  chunk = 1
  bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all
  begin
    puts "Downloading #{table}"
    copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'"

    source_connection.copy_data(copy_command) do
      while row = source_connection.get_copy_data
        zip.write(row)
        if (zip.pos > chunksize)
          zip.finish
          tmpfile.rewind
          upload_table(table, tmpfile, chunk)
          chunk += 1
          zip.close unless zip.closed?
          tmpfile.unlink
          tmpfile = Tempfile.new("psql2rs")
          zip = Zlib::GzipWriter.new(tmpfile)
        end
      end
    end
    zip.finish
    tmpfile.rewind
    upload_table(table, tmpfile, chunk)
  ensure
    zip.close unless zip.closed?
    tmpfile.unlink
  end
end

#import_table(table) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/postgres_to_redshift.rb', line 126

def import_table(table)
  puts "Importing #{table.target_table_name}"
  target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating")

  target_connection.exec("BEGIN;")

  target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")

  target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

  target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

  target_connection.exec("COMMIT;")
end

#tablesObject



66
67
68
69
70
71
72
73
# File 'lib/postgres_to_redshift.rb', line 66

def tables
  source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes|
    table = Table.new(attributes: table_attributes)
    next if table.name =~ /^pg_/
    table.columns = column_definitions(table)
    table
  end.compact
end

#upload_table(table, buffer, chunk) ⇒ Object



121
122
123
124
# File 'lib/postgres_to_redshift.rb', line 121

def upload_table(table, buffer, chunk)
  puts "Uploading #{table.target_table_name}.#{chunk}"
  bucket.objects["export/#{table.target_table_name}.psv.gz.#{chunk}"].write(buffer, acl: :authenticated_read)
end