Class: PostgresToRedshift

Inherits:
Object
  • Object
show all
Defined in:
lib/postgres_to_redshift/table.rb,
lib/postgres_to_redshift.rb,
lib/postgres_to_redshift/version.rb

Overview

table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |

Defined Under Namespace

Classes: Column, Table

Constant Summary collapse

VERSION =
"0.1.1"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.source_uriObject

Returns the value of attribute source_uri.



12
13
14
# File 'lib/postgres_to_redshift.rb', line 12

def source_uri
  @source_uri
end

.target_uriObject

Returns the value of attribute target_uri.



12
13
14
# File 'lib/postgres_to_redshift.rb', line 12

def target_uri
  @target_uri
end

Instance Attribute Details

#s3Object (readonly)

Returns the value of attribute s3.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def s3
  @s3
end

#source_connectionObject (readonly)

Returns the value of attribute source_connection.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def source_connection
  @source_connection
end

#target_connectionObject (readonly)

Returns the value of attribute target_connection.



15
16
17
# File 'lib/postgres_to_redshift.rb', line 15

def target_connection
  @target_connection
end

Class Method Details

.source_connectionObject



37
38
39
40
41
42
43
44
# File 'lib/postgres_to_redshift.rb', line 37

def self.source_connection
  unless instance_variable_defined?(:"@source_connection")
    @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
    @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
  end

  @source_connection
end

.target_connectionObject



46
47
48
49
50
51
52
# File 'lib/postgres_to_redshift.rb', line 46

def self.target_connection
  unless instance_variable_defined?(:"@target_connection")
    @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1])
  end

  @target_connection
end

.update_tablesObject



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/postgres_to_redshift.rb', line 17

def self.update_tables
  update_tables = PostgresToRedshift.new

  update_tables.tables.each do |table|
    target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{table.target_table_name} (#{table.columns_for_create})")

    update_tables.copy_table(table)

    update_tables.import_table(table)
  end
end

Instance Method Details

#bucketObject



79
80
81
# File 'lib/postgres_to_redshift.rb', line 79

def bucket
  @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']]
end

#column_definitions(table) ⇒ Object



71
72
73
# File 'lib/postgres_to_redshift.rb', line 71

def column_definitions(table)
  source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position")
end

#copy_table(table) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/postgres_to_redshift.rb', line 83

def copy_table(table)
  buffer = StringIO.new
  zip = Zlib::GzipWriter.new(buffer)

  puts "Downloading #{table}"
  copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'"

  source_connection.copy_data(copy_command) do
    while row = source_connection.get_copy_data
      zip.write(row)
    end
  end
  zip.finish
  buffer.rewind
  upload_table(table, buffer)
end

#import_table(table) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/postgres_to_redshift.rb', line 106

def import_table(table)
  puts "Importing #{table.target_table_name}"
  target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating")

  target_connection.exec("BEGIN;")

  target_connection.exec("ALTER TABLE public.#{table.target_table_name} RENAME TO #{table.target_table_name}_updating")

  target_connection.exec("CREATE TABLE public.#{table.target_table_name} (#{table.columns_for_create})")

  target_connection.exec("COPY public.#{table.target_table_name} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

  target_connection.exec("COMMIT;")
end

#tablesObject



62
63
64
65
66
67
68
69
# File 'lib/postgres_to_redshift.rb', line 62

def tables
  source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes|
    table = Table.new(attributes: table_attributes)
    next if table.name =~ /^pg_/
    table.columns = column_definitions(table)
    table
  end.compact
end

#upload_table(table, buffer) ⇒ Object



100
101
102
103
104
# File 'lib/postgres_to_redshift.rb', line 100

def upload_table(table, buffer)
  puts "Uploading #{table.target_table_name}"
  bucket.objects["export/#{table.target_table_name}.psv.gz"].delete
  bucket.objects["export/#{table.target_table_name}.psv.gz"].write(buffer, acl: :authenticated_read)
end