12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/embulk/output/vertica.rb', line 12
def self.transaction(config, schema, processor_count, &control)
task = {
'host' => config.param('host', :string, :default => 'localhost'),
'port' => config.param('port', :integer, :default => 5433),
'user' => config.param('user', :string, :default => nil),
'username' => config.param('username', :string, :default => nil), 'password' => config.param('password', :string, :default => ''),
'database' => config.param('database', :string, :default => 'vdb'),
'schema' => config.param('schema', :string, :default => 'public'),
'table' => config.param('table', :string),
'mode' => config.param('mode', :string, :default => 'insert'),
'copy_mode' => config.param('copy_mode', :string, :default => 'AUTO'),
'abort_on_error' => config.param('abort_on_error', :bool, :default => false),
'default_timezone' => config.param('default_timezone', :string, :default => 'UTC'),
'column_options' => config.param('column_options', :hash, :default => {}),
'reject_on_materialized_type_error' => config.param('reject_on_materialized_type_error', :bool, :default => false),
}
task['user'] ||= task['username']
unless task['user']
raise ConfigError.new 'required field "user" is not set'
end
task['mode'] = task['mode'].upcase
unless %w[INSERT REPLACE].include?(task['mode'])
raise ConfigError.new "`mode` must be one of INSERT, REPLACE"
end
task['copy_mode'] = task['copy_mode'].upcase
unless %w[AUTO DIRECT TRICKLE].include?(task['copy_mode'])
raise ConfigError.new "`copy_mode` must be one of AUTO, DIRECT, TRICKLE"
end
now = Time.now
unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec]
task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}"
quoted_schema = ::Jvertica.quote_identifier(task['schema'])
quoted_table = ::Jvertica.quote_identifier(task['table'])
quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table'])
sql_schema_table = self.sql_schema_from_embulk_schema(schema, task['column_options'])
connect(task) do |jv|
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_table}]) if task['mode'] == 'REPLACE'
query(jv, %[CREATE TABLE IF NOT EXISTS #{quoted_schema}.#{quoted_table} (#{sql_schema_table})])
end
sql_schema_temp_table = self.sql_schema_from_table(task)
connect(task) do |jv|
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}])
query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} (#{sql_schema_temp_table})])
end
begin
task_reports = yield(task) Embulk.logger.info { "embulk-output-vertica: task_reports: #{task_reports.to_json}" }
connect(task) do |jv|
query(jv, %[INSERT INTO #{quoted_schema}.#{quoted_table} SELECT * FROM #{quoted_schema}.#{quoted_temp_table}])
jv.commit
end
ensure
connect(task) do |jv|
query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}])
Embulk.logger.debug { "embulk-output-vertica: #{query(jv, %[SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 10]).map {|row| row.to_h }.join("\n") rescue nil}" }
end
end
next_config_diff = {}
return next_config_diff
end
|