Class: Embulk::PostgresJsonOutputPlugin
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Embulk::PostgresJsonOutputPlugin
- Defined in:
- lib/embulk/output/postgres_json.rb
Class Method Summary collapse
- .connect(task) ⇒ Object
- .execute_sql(pg, sql, *args) ⇒ Object
- .transaction(config, schema, processor_count, &control) ⇒ Object
Instance Method Summary collapse
- #abort ⇒ Object
- #add(page) ⇒ Object
- #close ⇒ Object
- #commit ⇒ Object
- #finish ⇒ Object
-
#initialize(task, schema, index) ⇒ PostgresJsonOutputPlugin
constructor
A new instance of PostgresJsonOutputPlugin.
Constructor Details
#initialize(task, schema, index) ⇒ PostgresJsonOutputPlugin
Returns a new instance of PostgresJsonOutputPlugin.
77 78 79 80 |
# File 'lib/embulk/output/postgres_json.rb', line 77 def initialize(task, schema, index) super @pg = self.class.connect(task) end |
Class Method Details
.connect(task) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/embulk/output/postgres_json.rb', line 51 def self.connect(task) url = "jdbc:postgresql://#{task['host']}:#{task['port']}/#{task['database']}" props = java.util.Properties.new props.put("user", task['username']) props.put("password", task['password']) pg = org.postgresql.Driver.new.connect(url, props) if block_given? begin yield pg ensure pg.close end end pg end |
.execute_sql(pg, sql, *args) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/embulk/output/postgres_json.rb', line 68 def self.execute_sql(pg, sql, *args) stmt = pg.createStatement begin stmt.execute(sql) ensure stmt.close end end |
.transaction(config, schema, processor_count, &control) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/embulk/output/postgres_json.rb', line 8 def self.transaction(config, schema, processor_count, &control) task = { 'host' => config.param('host', :string), 'port' => config.param('port', :integer, default: 5432), 'username' => config.param('username', :string), 'password' => config.param('password', :string, default: ''), 'database' => config.param('database', :string), 'table' => config.param('table', :string), 'column' => config.param('column', :string, default: 'json'), 'column_type' => config.param('column_type', :string, default: 'json'), } now = Time.now unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec] # TODO add org.embulk.spi.ExecSession.getTransactionUniqueName() method task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}" connect(task) do |pg| # drop table if exists "DEST" # create table if exists "TEMP" ("COL" json) execute_sql(pg, %[drop table if exists "#{task['temp_table']}"]) execute_sql(pg, %[create table "#{task['temp_table']}" ("#{task['column']}" #{task['column_type']})]) end begin yield(task) connect(task) do |pg| # create table if not exists "DEST" ("COL" json) # insert into "DEST" ("COL") select "COL" from "TEMP" execute_sql(pg, %[create table if not exists "#{task['table']}" ("#{task['column']}" #{task['column_type']})]) execute_sql(pg, %[insert into "#{task['table']}" ("#{task['column']}") select "#{task['column']}" from "#{task['temp_table']}"]) end ensure connect(task) do |pg| # drop table if exists TEMP execute_sql(pg, %[drop table if exists "#{task['temp_table']}"]) end end return {} end |
Instance Method Details
#abort ⇒ Object
101 102 |
# File 'lib/embulk/output/postgres_json.rb', line 101 def abort end |
#add(page) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/embulk/output/postgres_json.rb', line 86 def add(page) prep = @pg.prepareStatement(%[insert into "#{@task['temp_table']}" (#{@task['column']}) values (?::#{@task['column_type']})]) begin page.each do |record| prep.setString(1, record.to_json) prep.execute end ensure prep.close end end |
#close ⇒ Object
82 83 84 |
# File 'lib/embulk/output/postgres_json.rb', line 82 def close @pg.close end |
#commit ⇒ Object
104 105 106 |
# File 'lib/embulk/output/postgres_json.rb', line 104 def commit {} end |
#finish ⇒ Object
98 99 |
# File 'lib/embulk/output/postgres_json.rb', line 98 def finish end |