Class: PostgresUpsert::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/postgres_upsert/writer.rb

Instance Method Summary collapse

Constructor Details

#initialize(table_name, source, options = {}) ⇒ Writer

Returns a new instance of Writer.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/postgres_upsert/writer.rb', line 8

def initialize(table_name, source, options = {})
  @table_name = table_name
  @options = options.reverse_merge({
    :delimiter => ",", 
    :format => :csv, 
    :header => true, 
    :key_column => primary_key,
    :update_only => false})
  @source = source.instance_of?(String) ? File.open(source, 'r') : source
  @columns_list = get_columns
  generate_temp_table_name
end

Instance Method Details

#writeObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/postgres_upsert/writer.rb', line 21

def write
  if @columns_list.empty? 
    raise "Either the :columns option or :header => true are required"
  end

  csv_options = @options[:format] == :binary ? "BINARY" : "DELIMITER '#{@options[:delimiter]}' CSV"

  copy_table = @temp_table_name

  columns_string = columns_string_for_copy

  base_connection = Sequel.connect(ActiveRecord::Base.connection.config[:url])

  base_connection.synchronize do |connection|
    create_temp_table(connection)
    copy_manager = org.postgresql.copy.CopyManager.new(connection)
    stream = copy_manager.copy_in("COPY #{copy_table} #{columns_string} FROM STDIN WITH #{csv_options}")
    while line = read_input_line do
      next if line.strip.size == 0
      line = line.to_java_bytes
      stream.write_to_copy(line, 0, line.length)
    end
    stream.end_copy
    upsert_from_temp_table(connection)
    drop_temp_table(connection)
  end

end