12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/embulk/output/vertica.rb', line 12
def self.transaction(config, schema, processor_count, &control)
task = {
'host' => config.param('host', :string, :default => 'localhost'),
'port' => config.param('port', :integer, :default => 5433),
'username' => config.param('username', :string),
'password' => config.param('password', :string, :default => ''),
'database' => config.param('database', :string, :default => 'vdb'),
'schema' => config.param('schema', :string, :default => 'public'),
'table' => config.param('table', :string),
'mode' => config.param('mode', :string, :default => 'insert'),
'copy_mode' => config.param('copy_mode', :string, :default => 'AUTO'),
'abort_on_error' => config.param('abort_on_error', :bool, :default => false),
'default_timezone' => config.param('default_timezone', :string, :default => 'UTC'),
'column_options' => config.param('column_options', :hash, :default => {}),
'reject_on_materialized_type_error' => config.param('reject_on_materialized_type_error', :bool, :default => false),
}
unless %w[INSERT REPLACE].include?(task['mode'].upcase!)
raise ConfigError, "`mode` must be one of INSERT, REPLACE"
end
unless %w[AUTO DIRECT TRICKLE].include?(task['copy_mode'].upcase!)
raise ConfigError, "`copy_mode` must be one of AUTO, DIRECT, TRICKLE"
end
now = Time.now
unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec]
task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}"
sql_schema = self.to_sql_schema(schema, task['column_options'])
quoted_schema = ::Jvertica.quote_identifier(task['schema'])
quoted_table = ::Jvertica.quote_identifier(task['table'])
quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table'])
connect(task) do |jv|
if task['mode'] == 'REPLACE'
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_table}])
end
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}])
query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} (#{sql_schema})])
end
begin
yield(task)
connect(task) do |jv|
query(jv, %[CREATE TABLE IF NOT EXISTS #{quoted_schema}.#{quoted_table} (#{sql_schema})])
query(jv, %[INSERT INTO #{quoted_schema}.#{quoted_table} SELECT * FROM #{quoted_schema}.#{quoted_temp_table}])
jv.commit
end
ensure
connect(task) do |jv|
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}])
Embulk.logger.debug { "embulk-output-vertica: #{query(jv, %[SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 10]).map {|row| row.to_h }.join("\n") rescue nil}" }
end
end
return {}
end
|