Class: TableCopy::PG::Destination
- Inherits:
-
Object
- Object
- TableCopy::PG::Destination
- Defined in:
- lib/table_copy/pg/destination.rb
Instance Attribute Summary collapse
-
#conn_method ⇒ Object
readonly
Returns the value of attribute conn_method.
-
#fields ⇒ Object
readonly
Returns the value of attribute fields.
-
#indexes ⇒ Object
readonly
Returns the value of attribute indexes.
-
#sequence_field ⇒ Object
readonly
Returns the value of attribute sequence_field.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
Instance Method Summary collapse
- #copy_data_from(source_table, temp: nil, pk_only: false, update: false) ⇒ Object
- #copy_from_temp(except: except_statement) ⇒ Object
- #create(fields_ddl) ⇒ Object
- #create_indexes ⇒ Object
- #create_temp(fields_ddl) ⇒ Object
- #create_views(views) ⇒ Object
- #delete_not_in_temp ⇒ Object
- #drop(opts = {}) ⇒ Object
-
#initialize(args) ⇒ Destination
constructor
A new instance of Destination.
- #max_sequence ⇒ Object
- #none? ⇒ Boolean
- #query_views ⇒ Object
- #to_s ⇒ Object
- #transaction ⇒ Object
Constructor Details
#initialize(args) ⇒ Destination
6 7 8 9 10 11 12 13 |
# File 'lib/table_copy/pg/destination.rb', line 6 def initialize(args) @table_name = args[:table_name] @primary_key = args[:primary_key] @sequence_field = args[:sequence_field] @conn_method = args[:conn_method] @indexes = args[:indexes] || [] @fields = args[:fields] end |
Instance Attribute Details
#conn_method ⇒ Object (readonly)
Returns the value of attribute conn_method.
4 5 6 |
# File 'lib/table_copy/pg/destination.rb', line 4 def conn_method @conn_method end |
#fields ⇒ Object (readonly)
Returns the value of attribute fields.
4 5 6 |
# File 'lib/table_copy/pg/destination.rb', line 4 def fields @fields end |
#indexes ⇒ Object (readonly)
Returns the value of attribute indexes.
4 5 6 |
# File 'lib/table_copy/pg/destination.rb', line 4 def indexes @indexes end |
#sequence_field ⇒ Object (readonly)
Returns the value of attribute sequence_field.
4 5 6 |
# File 'lib/table_copy/pg/destination.rb', line 4 def sequence_field @sequence_field end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
4 5 6 |
# File 'lib/table_copy/pg/destination.rb', line 4 def table_name @table_name end |
Instance Method Details
#copy_data_from(source_table, temp: nil, pk_only: false, update: false) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/table_copy/pg/destination.rb', line 74 def copy_data_from(source_table, temp: nil, pk_only: false, update: false) temp = 'temp_' if temp fl = pk_only ? primary_key : fields_list where = "where #{sequence_field} > '#{update}'" if update && sequence_field count = 0 source_table.copy_from(fl, where) do |source_conn| with_conn do |conn| conn.copy_data("COPY #{temp}#{table_name} (#{fl}) FROM STDOUT CSV") do while row = source_conn.get_copy_data count += 1 conn.put_copy_data(row) end end end end count end |
#copy_from_temp(except: except_statement) ⇒ Object
92 93 94 95 96 |
# File 'lib/table_copy/pg/destination.rb', line 92 def copy_from_temp(except: except_statement) with_conn do |conn| conn.exec(upsert_sql(except)) end end |
#create(fields_ddl) ⇒ Object
28 29 30 31 32 |
# File 'lib/table_copy/pg/destination.rb', line 28 def create(fields_ddl) with_conn do |conn| conn.exec("create table #{table_name} (#{fields_ddl})") end end |
#create_indexes ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/table_copy/pg/destination.rb', line 41 def create_indexes indexes.each do |index| create_ddl = index.class.new(table_name, index.name, index.columns).create with_conn do |conn| conn.exec(create_ddl) end end end |
#create_temp(fields_ddl) ⇒ Object
62 63 64 65 66 |
# File 'lib/table_copy/pg/destination.rb', line 62 def create_temp(fields_ddl) with_conn do |conn| conn.exec("create temp table temp_#{table_name} (#{fields_ddl}) on commit drop") end end |
#create_views(views) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/table_copy/pg/destination.rb', line 110 def create_views(views) with_conn do |conn| views.inject({}) do |result, view| begin conn.exec("create or replace view #{view['viewname']} as (#{view['definition'].gsub(/;\z/, '')})") result[view['viewname']] = true rescue ::PG::UndefinedTable, ::PG::UndefinedColumn => e result[view['viewname']] = false end result end end end |
#delete_not_in_temp ⇒ Object
98 99 100 101 102 |
# File 'lib/table_copy/pg/destination.rb', line 98 def delete_not_in_temp with_conn do |conn| conn.exec("delete from #{table_name} where #{primary_key} in (select #{primary_key} from #{table_name} except select #{primary_key} from temp_#{table_name})") end end |
#drop(opts = {}) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/table_copy/pg/destination.rb', line 34 def drop(opts={}) cascade = ' cascade' if opts[:cascade] with_conn do |conn| conn.exec("#{drop_sql}#{cascade}") end end |
#max_sequence ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/table_copy/pg/destination.rb', line 54 def max_sequence return unless sequence_field with_conn do |conn| row = conn.exec(max_sequence_sql).first row['max'] if row end end |
#none? ⇒ Boolean
68 69 70 71 72 |
# File 'lib/table_copy/pg/destination.rb', line 68 def none? with_conn do |conn| conn.exec("select count(*) from #{table_name}").first['count'] == '0' end end |
#query_views ⇒ Object
104 105 106 107 108 |
# File 'lib/table_copy/pg/destination.rb', line 104 def query_views with_conn do |conn| conn.exec(views_sql) end end |
#to_s ⇒ Object
50 51 52 |
# File 'lib/table_copy/pg/destination.rb', line 50 def to_s table_name end |
#transaction ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/table_copy/pg/destination.rb', line 15 def transaction with_conn do |conn| begin conn.exec('begin') yield conn.exec('commit') rescue Exception => e conn.exec('rollback') raise e end end end |