66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/fluent/plugin/out_mysql_select_insert.rb', line 66
def write(chunk)
client = new_client
condition_values = []
chunk.msgpack_each do |time, record|
condition_values << "'#{client.escape(record[condition_key])}'"
end
sql = <<~SQL
INSERT #{"IGNORE" if ignore} INTO `#{table}` #{"(#{inserted_columns.join(",")})" if inserted_columns}
#{select_query}
WHERE #{condition_column} IN (#{condition_values.join(",")})
SQL
sql << " AND (#{[0]})" unless .empty?
bound_params = [1..-1]&.map { |c| (c, chunk.metadata) }
begin
if bound_params.nil? || bound_params.empty?
client.query(sql)
else
require "mysql2-cs-bind"
client.xquery(sql, bound_params)
end
rescue Mysql2::Error => e
if e.message.start_with?("Column count doesn't match value count")
raise Fluent::UnrecoverableError, "#{e.class}: #{e}"
end
raise
end
client.close
end
|