Class: Embulk::Output::Vertica
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Embulk::Output::Vertica
- Defined in:
- lib/embulk/output/vertica.rb,
lib/embulk/output/vertica/output_thread.rb,
lib/embulk/output/vertica/value_converter_factory.rb
Defined Under Namespace
Classes: CommitError, DequeueTimeoutError, Error, FinishTimeoutError, NotSupportedType, OutputThread, OutputThreadPool, TimeoutError, ValueConverterFactory, WriteTimeoutError
Class Method Summary collapse
- .thread_pool ⇒ Object
- .transaction(config, schema, task_count, &control) ⇒ Object
- .transaction_report(jv, task, task_reports) ⇒ Object
Instance Method Summary collapse
- #abort ⇒ Object
-
#add(page) ⇒ Object
called for each page in each thread.
-
#close ⇒ Object
called for each page in each thread.
-
#commit ⇒ Object
called after processing all pages in each thread we do commit on #transaction for all pools, not at here.
- #finish ⇒ Object
-
#initialize(task, schema, index) ⇒ Vertica
constructor
instance is created on each thread.
Constructor Details
#initialize(task, schema, index) ⇒ Vertica
instance is created on each thread
166 167 168 |
# File 'lib/embulk/output/vertica.rb', line 166 def initialize(task, schema, index) super end |
Class Method Details
.thread_pool ⇒ Object
14 15 16 |
# File 'lib/embulk/output/vertica.rb', line 14 def self.thread_pool @thread_pool ||= @thread_pool_proc.call end |
.transaction(config, schema, task_count, &control) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/embulk/output/vertica.rb', line 35 def self.transaction(config, schema, task_count, &control) task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 5433), 'user' => config.param('user', :string, :default => nil), 'username' => config.param('username', :string, :default => nil), # alias to :user for backward compatibility 'password' => config.param('password', :string, :default => ''), 'database' => config.param('database', :string, :default => 'vdb'), 'schema' => config.param('schema', :string, :default => 'public'), 'table' => config.param('table', :string), 'mode' => config.param('mode', :string, :default => 'insert'), 'copy_mode' => config.param('copy_mode', :string, :default => 'AUTO'), 'abort_on_error' => config.param('abort_on_error', :bool, :default => false), 'compress' => config.param('compress', :string, :default => 'UNCOMPRESSED'), 'default_timezone' => config.param('default_timezone', :string, :default => 'UTC'), 'column_options' => config.param('column_options', :hash, :default => {}), 'json_payload' => config.param('json_payload', :bool, :default => false), 'resource_pool' => config.param('resource_pool', :string, :default => nil), 'reject_on_materialized_type_error' => config.param('reject_on_materialized_type_error', :bool, :default => false), 'pool' => config.param('pool', :integer, :default => task_count), 'write_timeout' => config.param('write_timeout', :integer, :default => nil), # like 11 * 60 sec 'dequeue_timeout' => config.param('dequeue_timeout', :integer, :default => nil), # like 13 * 60 sec 'finish_timeout' => config.param('finish_timeout', :integer, :default => nil), # like 3 * 60 sec } @thread_pool_proc = Proc.new do OutputThreadPool.new(task, schema, task['pool']) end task['user'] ||= task['username'] unless task['user'] raise ConfigError.new 'required field "user" is not set' end task['mode'] = task['mode'].upcase unless %w[INSERT REPLACE DROP_INSERT].include?(task['mode']) raise ConfigError.new "`mode` must be one of INSERT, REPLACE, DROP_INSERT" end task['copy_mode'] = task['copy_mode'].upcase unless %w[AUTO DIRECT TRICKLE].include?(task['copy_mode']) raise ConfigError.new "`copy_mode` must be one of AUTO, DIRECT, TRICKLE" end # ToDo: Support BZIP, LZO task['compress'] = task['compress'].upcase unless %w[GZIP UNCOMPRESSED].include?(task['compress']) raise ConfigError.new "`compress` must be one of GZIP, UNCOMPRESSED" end now = Time.now unique_name = SecureRandom.uuid task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}" quoted_schema = ::Jvertica.quote_identifier(task['schema']) quoted_table = ::Jvertica.quote_identifier(task['table']) quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table']) connect(task) do |jv| unless task['json_payload'] # ToDo: auto table creation is not supported to json_payload mode yet sql_schema_table = self.sql_schema_from_embulk_schema(schema, task['column_options']) # create the target table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_table}]) if task['mode'] == 'DROP_INSERT' query(jv, %[CREATE TABLE IF NOT EXISTS #{quoted_schema}.#{quoted_table} (#{sql_schema_table})]) end # create a temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) if task['mode'] == 'REPLACE' # In the case of replace mode, this temp table is replaced with the original table. So, projections should also be copied query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} LIKE #{quoted_schema}.#{quoted_table} INCLUDING PROJECTIONS]) else query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} LIKE #{quoted_schema}.#{quoted_table}]) # Create internal vertica projection beforehand, otherwirse, parallel copies lock table to create a projection and we get S Lock error sometimes # This is a trick to create internal vertica projection query(jv, %[INSERT INTO #{quoted_schema}.#{quoted_temp_table} SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 0]) end Embulk.logger.trace { result = query(jv, %[SELECT EXPORT_OBJECTS('', '#{task['schema']}.#{task['temp_table']}')]) # You can see `CREATE PROJECTION` if the table has a projection "embulk-output-vertica: #{result.to_a.flatten}" } end begin # insert data into the temp table thread_pool.start yield(task) task_reports = thread_pool.commit Embulk.logger.info { "embulk-output-vertica: task_reports: #{task_reports.to_json}" } connect(task) do |jv| transaction_report = self.transaction_report(jv, task, task_reports) Embulk.logger.info { "embulk-output-vertica: transaction_report: #{transaction_report.to_json}" } if task['abort_on_error'] # double-meaning, also used for COPY statement if transaction_report['num_input_rows'] != transaction_report['num_output_rows'] raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \ "`num_output_rows (#{transaction_report['num_output_rows']})` does not match" end end if task['mode'] == 'REPLACE' # swap table and drop the old table quoted_old_table = ::Jvertica.quote_identifier("#{task['table']}_LOAD_OLD_#{unique_name}") from = "#{quoted_schema}.#{quoted_table},#{quoted_schema}.#{quoted_temp_table}" to = "#{quoted_old_table},#{quoted_table}" query(jv, %[ALTER TABLE #{from} RENAME TO #{to}]) query(jv, %[DROP TABLE #{quoted_schema}.#{quoted_old_table}]) else # insert select from the temp table hint = '/*+ direct */ ' if task['copy_mode'] == 'DIRECT' # I did not prepare a specific option, does anyone want? query(jv, %[INSERT #{hint}INTO #{quoted_schema}.#{quoted_table} SELECT * FROM #{quoted_schema}.#{quoted_temp_table}]) jv.commit end end ensure connect(task) do |jv| # clean up the temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) Embulk.logger.trace { "embulk-output-vertica: select result\n#{query(jv, %[SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 10]).map {|row| row.to_h }.join("\n") rescue nil}" } end end # this is for -o next_config option, add some paramters for next time execution if wants next_config_diff = {} return next_config_diff end |
.transaction_report(jv, task, task_reports) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/embulk/output/vertica.rb', line 18 def self.transaction_report(jv, task, task_reports) quoted_schema = ::Jvertica.quote_identifier(task['schema']) quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table']) num_input_rows = task_reports.map {|report| report['num_input_rows'].to_i }.inject(:+) num_response_rows = task_reports.map {|report| report['num_output_rows'].to_i }.inject(:+) result = query(jv, %[SELECT COUNT(*) FROM #{quoted_schema}.#{quoted_temp_table}]) num_output_rows = result.map {|row| row.values }.flatten.first.to_i num_rejected_rows = num_input_rows - num_output_rows transaction_report = { 'num_input_rows' => num_input_rows, 'num_response_rows' => num_response_rows, 'num_output_rows' => num_output_rows, 'num_rejected_rows' => num_rejected_rows, } end |
Instance Method Details
#abort ⇒ Object
182 183 |
# File 'lib/embulk/output/vertica.rb', line 182 def abort end |
#add(page) ⇒ Object
called for each page in each thread
175 176 177 |
# File 'lib/embulk/output/vertica.rb', line 175 def add(page) self.class.thread_pool.enqueue(page) end |
#close ⇒ Object
called for each page in each thread
171 172 |
# File 'lib/embulk/output/vertica.rb', line 171 def close end |
#commit ⇒ Object
called after processing all pages in each thread we do commit on #transaction for all pools, not at here
187 188 189 |
# File 'lib/embulk/output/vertica.rb', line 187 def commit {} end |
#finish ⇒ Object
179 180 |
# File 'lib/embulk/output/vertica.rb', line 179 def finish end |