Class: Remi::DataTarget::Postgres

Inherits:
Object
  • Object
show all
Includes:
Remi::DataTarget
Defined in:
lib/remi/data_target/postgres.rb

Instance Method Summary collapse

Methods included from Remi::DataSubject

#df, #df=, #field_symbolizer, #fields

Constructor Details

#initialize(credentials:, table_name:, fields:, logger: Remi::Settings.logger) ⇒ Postgres

Returns a new instance of Postgres.



6
7
8
9
10
11
# File 'lib/remi/data_target/postgres.rb', line 6

def initialize(credentials:, table_name:, fields:, logger: Remi::Settings.logger)
  @credentials = credentials
  @table_name = table_name
  @fields = fields
  @logger = logger
end

Instance Method Details

#connectionObject



24
25
26
27
28
29
30
31
32
33
# File 'lib/remi/data_target/postgres.rb', line 24

def connection
  @connection ||= PG.connect(
    host:     @credentials[:host] || 'localhost',
    port:     @credentials[:port] || 5432,
    dbname:   @credentials[:dbname],
    user:     @credentials[:user] || `whoami`.chomp,
    password: @credentials[:password],
    sslmode:  @credentials[:sslmode] || 'allow'
  )
end

#create_target_tableObject



40
41
42
43
44
45
46
# File 'lib/remi/data_target/postgres.rb', line 40

def create_target_table
  connection.exec <<-EOT
    CREATE TEMPORARY TABLE #{@table_name} (
      #{fields_with_type_ddl}
    )
  EOT
end

#fields_with_type_ddlObject



36
37
38
# File 'lib/remi/data_target/postgres.rb', line 36

def fields_with_type_ddl
  @fields.map { |k,v| "#{k} #{v[:type]}" }.join(', ')
end

#loadObject



13
14
15
16
17
18
19
20
21
# File 'lib/remi/data_target/postgres.rb', line 13

def load
  return true if @loaded || df.size == 0

  @logger.info "Performing postgres load to table #{@table_name}"
  create_target_table
  load_target_table

  @loaded = true
end

#load_target_tableObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/remi/data_target/postgres.rb', line 48

def load_target_table
  connection.copy_data "COPY #{@table_name} (#{@fields.keys.join(', ')}) FROM STDIN" do
    df.each(:row) do |row|
      row_str = @fields.keys.map do |field|
        field = row[field]
        case
        when field.respond_to?(:strftime)
          field.strftime('%Y-%m-%d %H:%M:%S')
        when field.respond_to?(:map)
          field.to_json.gsub("\t", '\t')
        when field.blank? && !field.nil?
          ''
        when field.nil?
          '\N'
        else
          field.to_s.gsub("\t", '\t')
        end
      end.join("\t")

      connection.put_copy_data row_str + "\n"
    end
  end
end