Class: Embulk::PostgresJsonOutputPlugin

Inherits:
OutputPlugin
  • Object
show all
Defined in:
lib/embulk/output/postgres_json.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, schema, index) ⇒ PostgresJsonOutputPlugin

Returns a new instance of PostgresJsonOutputPlugin.



77
78
79
80
# File 'lib/embulk/output/postgres_json.rb', line 77

def initialize(task, schema, index)
  super
  @pg = self.class.connect(task)
end

Class Method Details

.connect(task) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/embulk/output/postgres_json.rb', line 51

def self.connect(task)
  url = "jdbc:postgresql://#{task['host']}:#{task['port']}/#{task['database']}"
  props = java.util.Properties.new
  props.put("user", task['username'])
  props.put("password", task['password'])

  pg = org.postgresql.Driver.new.connect(url, props)
  if block_given?
    begin
      yield pg
    ensure
      pg.close
    end
  end
  pg
end

.execute_sql(pg, sql, *args) ⇒ Object



68
69
70
71
72
73
74
75
# File 'lib/embulk/output/postgres_json.rb', line 68

def self.execute_sql(pg, sql, *args)
  stmt = pg.createStatement
  begin
    stmt.execute(sql)
  ensure
    stmt.close
  end
end

.transaction(config, schema, processor_count, &control) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/embulk/output/postgres_json.rb', line 8

def self.transaction(config, schema, processor_count, &control)
  task = {
    'host' => config.param('host', :string),
    'port' => config.param('port', :integer, default: 5432),
    'username' => config.param('username', :string),
    'password' => config.param('password', :string, default: ''),
    'database' => config.param('database', :string),
    'table' => config.param('table', :string),
    'column' => config.param('column', :string, default: 'json'),
    'column_type' => config.param('column_type', :string, default: 'json'),
  }

  now = Time.now
  unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec]  # TODO add org.embulk.spi.ExecSession.getTransactionUniqueName() method
  task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}"

  connect(task) do |pg|
    # drop table if exists "DEST"
    # create table if exists "TEMP" ("COL" json)
    execute_sql(pg, %[drop table if exists "#{task['temp_table']}"])
    execute_sql(pg, %[create table "#{task['temp_table']}" ("#{task['column']}" #{task['column_type']})])
  end

  begin
    yield(task)

    connect(task) do |pg|
      # create table if not exists "DEST" ("COL" json)
      # insert into "DEST" ("COL") select "COL" from "TEMP"
      execute_sql(pg, %[create table if not exists "#{task['table']}" ("#{task['column']}" #{task['column_type']})])
      execute_sql(pg, %[insert into "#{task['table']}" ("#{task['column']}") select "#{task['column']}" from "#{task['temp_table']}"])
    end

  ensure
    connect(task) do |pg|
      # drop table if exists TEMP
      execute_sql(pg, %[drop table if exists "#{task['temp_table']}"])
    end
  end

  return {}
end

Instance Method Details

#abortObject



101
102
# File 'lib/embulk/output/postgres_json.rb', line 101

def abort
end

#add(page) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/embulk/output/postgres_json.rb', line 86

def add(page)
  prep = @pg.prepareStatement(%[insert into "#{@task['temp_table']}" (#{@task['column']}) values (?::#{@task['column_type']})])
  begin
    page.each do |record|
      prep.setString(1, record.to_json)
      prep.execute
    end
  ensure
    prep.close
  end
end

#closeObject



82
83
84
# File 'lib/embulk/output/postgres_json.rb', line 82

def close
  @pg.close
end

#commitObject



104
105
106
# File 'lib/embulk/output/postgres_json.rb', line 104

def commit
  {}
end

#finishObject



98
99
# File 'lib/embulk/output/postgres_json.rb', line 98

def finish
end