26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/fluent/plugin/out_vertica.rb', line 26
def write(chunk)
chunk.open do |file|
reset
temp_table = "temp_#{@table}"
perm_table = "#{@schema}.#{@table}"
vertica.query(<<-SQL)
CREATE LOCAL TEMPORARY TABLE #{temp_table}
ON COMMIT DELETE ROWS
AS SELECT * FROM #{perm_table} LIMIT 0
SQL
vertica.copy(<<-SQL) { |handle| handle.write(file.read) }
COPY #{temp_table} (#{columns.join(",")})
FROM STDIN DELIMITER E'\t'
RECORD TERMINATOR E'\n' NULL AS '__NULL__'
ENFORCELENGTH
NO COMMIT
SQL
if primary_keys.empty?
vertica.query(<<-SQL)
INSERT INTO #{perm_table}
(#{columns.join(",")})
SELECT
#{columns.join(",")}
FROM #{temp_table}
SQL
else
condition = primary_keys.map do |key|
"#{perm_table}.#{key} = #{temp_table}.#{key}"
end
unless empty_table?(perm_table)
vertica.query(<<-SQL)
DELETE FROM #{perm_table}
WHERE EXISTS (
SELECT 1
FROM #{temp_table}
WHERE #{condition.join(" AND ")}
)
SQL
end
vertica.query(<<-SQL)
INSERT INTO #{perm_table}
(#{columns.join(",")})
SELECT
#{columns.join(",")}
FROM (
SELECT
#{columns.join(",")},
row_number() OVER (partition by #{primary_keys.join(",")}) AS r
FROM #{temp_table}
) AS temp
WHERE r = 1
SQL
end
vertica.query("COMMIT")
end
end
|