Class: PostgresToRedshift

Inherits:
Object
  • Object
show all
Defined in:
lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb

Constant Summary collapse

VERSION =
"0.0.1"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_uri:) ⇒ PostgresToRedshift

Returns a new instance of PostgresToRedshift.



19
20
21
22
23
24
25
# File 'lib/postgres_to_redshift.rb', line 19

def initialize(source_uri:)
  source_uri = URI.parse(source_uri)
  target_uri = URI.parse(ENV['REDSHIFT_URI'])
  @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user, password: source_uri.password, dbname: source_uri.path[1..-1])
  @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
  @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user, password: target_uri.password, dbname: target_uri.path[1..-1])
end

Instance Attribute Details

#s3Object (readonly)

Returns the value of attribute s3.



7
8
9
# File 'lib/postgres_to_redshift.rb', line 7

def s3
  @s3
end

#source_connectionObject (readonly)

Returns the value of attribute source_connection.



7
8
9
# File 'lib/postgres_to_redshift.rb', line 7

def source_connection
  @source_connection
end

#target_connectionObject (readonly)

Returns the value of attribute target_connection.



7
8
9
# File 'lib/postgres_to_redshift.rb', line 7

def target_connection
  @target_connection
end

Class Method Details

.update_tablesObject



9
10
11
12
13
14
15
16
17
# File 'lib/postgres_to_redshift.rb', line 9

def self.update_tables
  update_tables = PostgresToRedshift.new(source_uri: ARGV[0])
  update_tables.create_new_tables

  # FIXME: BIG WARNING HERE: this order is important. We want the views to overwrite the tables. We should make it so the order doesn't matter later.
  update_tables.copy_tables
  update_tables.copy_views
  update_tables.import_tables
end

Instance Method Details

#bucketObject



55
56
57
# File 'lib/postgres_to_redshift.rb', line 55

def bucket
  @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']]
end

#copy_table(source_table, target_table, is_view = false) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/postgres_to_redshift.rb', line 65

def copy_table(source_table, target_table, is_view = false)
  buffer = ""
  puts "Downloading #{source_table}"
  copy_command = 
    if is_view
      "COPY (SELECT * FROM #{source_table}) TO STDOUT WITH DELIMITER '|'"
    else
      "COPY #{source_table} TO STDOUT WITH DELIMITER '|'"
    end

  source_connection.copy_data(copy_command) do
    while row = source_connection.get_copy_data
      buffer << row
    end
  end
  upload_table(target_table, buffer)
end

#copy_tablesObject



104
105
106
107
108
# File 'lib/postgres_to_redshift.rb', line 104

def copy_tables
  tables.each do |table|
    copy_table(table, table)
  end
end

#copy_viewsObject



110
111
112
113
114
115
# File 'lib/postgres_to_redshift.rb', line 110

def copy_views
  views.each do |view|
    table = view.gsub(/_view/, '')
    copy_table(view, table, true)
  end
end

#create_new_tablesObject



59
60
61
62
63
# File 'lib/postgres_to_redshift.rb', line 59

def create_new_tables
  tables.each do |table|
    target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{table} (#{table_columns(table)})")
  end
end

#import_table(target_table) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/postgres_to_redshift.rb', line 89

def import_table(target_table)
  puts "Importing #{target_table}"
  target_connection.exec("DROP TABLE IF EXISTS public.#{target_table}_updating")

  target_connection.exec("BEGIN;")

  target_connection.exec("ALTER TABLE public.#{target_table} RENAME TO #{target_table}_updating")

  target_connection.exec("CREATE TABLE public.#{target_table} (#{table_columns(target_table)})")

  target_connection.exec("COPY public.#{target_table} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{target_table}.psv' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

  target_connection.exec("COMMIT;")
end

#import_tablesObject

FIXME: This relies on views being uploaded after tables.



118
119
120
121
122
# File 'lib/postgres_to_redshift.rb', line 118

def import_tables
  tables.each do |table|
    import_table(table)
  end
end

#table_columns(table_name) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/postgres_to_redshift.rb', line 35

def table_columns(table_name)
  source_connection.exec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table_name}'").map do |row| 
    data_type = row["data_type"]
    data_type.gsub!(/text/, 'character varying(max)')
    data_type.gsub!(/json/, 'character varying(max)')
    data_type.gsub!(/bytea/, 'character varying(max)')
    data_type.gsub!(/money/, 'character varying(max)')

    if row["character_maximum_length"].to_s.length > 0
      %Q|"#{row["column_name"]}" #{data_type}(#{row["character_maximum_length"]})|
    else
      %Q|"#{row["column_name"]}" #{data_type}|
    end
  end.join(", ")
end

#tablesObject



31
32
33
# File 'lib/postgres_to_redshift.rb', line 31

def tables
  source_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE'").map { |row| row["table_name"] }
end

#upload_table(target_table, buffer) ⇒ Object



83
84
85
86
87
# File 'lib/postgres_to_redshift.rb', line 83

def upload_table(target_table, buffer)
  puts "Uploading #{target_table}"
  bucket.objects["export/#{target_table}.psv"].delete
  bucket.objects["export/#{target_table}.psv"].write(buffer, acl: :authenticated_read)
end

#viewsObject



27
28
29
# File 'lib/postgres_to_redshift.rb', line 27

def views
  source_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'VIEW'").map { |row| row["table_name"] } - ["pg_stat_statements"]
end