Class: Kiba::Plus::Destination::PgBulk2
- Inherits:
-
Object
- Object
- Kiba::Plus::Destination::PgBulk2
- Includes:
- PgBulkUtils, Helper
- Defined in:
- lib/kiba/plus/destination/pg_bulk2.rb
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(options = {}) ⇒ PgBulk2
constructor
A new instance of PgBulk2.
- #write(row) ⇒ Object
Methods included from Helper
#format_sql, #mysql2_connect_hash, #scheme
Constructor Details
#initialize(options = {}) ⇒ PgBulk2
Returns a new instance of PgBulk2.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/kiba/plus/destination/pg_bulk2.rb', line 15 def initialize( = {}) = .assert_valid_keys( :connect_url, :schema, :table_name, :columns, :truncate, :incremental, :unique_by ) @conn = PG.connect(connect_url) @conn.exec "SET search_path TO %s" % [ [:schema] ] if [:schema] init end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
13 14 15 |
# File 'lib/kiba/plus/destination/pg_bulk2.rb', line 13 def conn @conn end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
13 14 15 |
# File 'lib/kiba/plus/destination/pg_bulk2.rb', line 13 def end |
Instance Method Details
#close ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/kiba/plus/destination/pg_bulk2.rb', line 43 def close @conn.put_copy_end @conn.get_last_result if incremental merge_to_target_table drop_staging_table end rescue raise ensure @conn.close @conn = nil end |
#write(row) ⇒ Object
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/kiba/plus/destination/pg_bulk2.rb', line 32 def write(row) begin @conn.put_copy_data CSV.generate_line(row.values_at(*columns)) rescue Exception => err errmsg = "%s while copy data: %s" % [ err.class.name, err. ] @conn.put_copy_end( errmsg ) Kiba::Plus.logger.error @conn.get_result raise end end |