32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/embulk/filter/ruby_proc.rb', line 32
def self.transaction(config, in_schema, &control)
task = {
"columns" => config.param("columns", :array, default: []),
"rows" => config.param("rows", :array, default: []),
"before" => config.param("before", :array, default: []),
"after" => config.param("after", :array, default: []),
"requires" => config.param("requires", :array, default: []),
"variables" => config.param("variables", :hash, default: {}),
}
out_columns = in_schema.map do |col|
target = task["columns"].find { |filter_col| filter_col["name"] == col.name }
if target
type = target["type"] ? target["type"].to_sym : col.type
Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format)
else
col
end
end
task["requires"].each do |lib|
require lib
end
@proc_store ||= {}
@row_proc_store ||= {}
transaction_id = rand(100000000)
until !@proc_store.has_key?(transaction_id)
transaction_id = rand(100000000)
end
evaluator_binding = Evaluator.new(task["variables"]).get_binding
before_procs = task["before"].map {|before|
if before["proc"]
eval(before["proc"], evaluator_binding)
else
eval(File.read(before["proc_file"]), evaluator_binding, File.expand_path(before["proc_file"]))
end
}
@proc_store[transaction_id] = procs = Hash[task["columns"].map {|col|
if col["proc"]
[col["name"], eval(col["proc"], evaluator_binding)]
else
[col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
end
}]
@row_proc_store[transaction_id] = row_procs = task["rows"].map {|rowdef|
if rowdef["proc"]
eval(rowdef["proc"], evaluator_binding)
else
eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
end
}.compact
task["transaction_id"] = transaction_id
raise "Need columns or rows parameter" if procs.empty? && row_procs.empty?
before_procs.each do |pr|
pr.call
end
yield(task, out_columns)
after_procs = task["after"].map {|after|
if after["proc"]
eval(after["proc"], evaluator_binding)
else
eval(File.read(after["proc_file"]), evaluator_binding, File.expand_path(after["proc_file"]))
end
}
after_procs.each do |pr|
pr.call
end
end
|