Class: Embulk::Filter::RubyProc
- Inherits:
-
FilterPlugin
- Object
- FilterPlugin
- Embulk::Filter::RubyProc
- Defined in:
- lib/embulk/filter/ruby_proc.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/embulk/filter/ruby_proc.rb', line 18 def self.transaction(config, in_schema, &control) task = { "columns" => config.param("columns", :array, default: []), "rows" => config.param("rows", :array, default: []), "requires" => config.param("requires", :array, default: []), "variables" => config.param("variables", :hash, default: {}), } out_columns = in_schema.map do |col| target = task["columns"].find { |filter_col| filter_col["name"] == col.name } if target type = target["type"] ? target["type"].to_sym : col.type Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format) else col end end yield(task, out_columns) end |
Instance Method Details
#add(page) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/embulk/filter/ruby_proc.rb', line 70 def add(page) page.each do |record| if @row_procs.empty? record_hashes = [hashrize(record)] else record_hashes = @row_procs.flat_map do |pr| pr.call(hashrize(record)) end end record_hashes.each do |record_hash| @procs.each do |col, pr| next unless record_hash.has_key?(col) next if record_hash[col].nil? && @skip_nils[col] if pr.arity == 1 record_hash[col] = pr.call(record_hash[col]) else record_hash[col] = pr.call(record_hash[col], record_hash) end end page_builder.add(record_hash.values) end end end |
#close ⇒ Object
67 68 |
# File 'lib/embulk/filter/ruby_proc.rb', line 67 def close end |
#finish ⇒ Object
96 97 98 |
# File 'lib/embulk/filter/ruby_proc.rb', line 96 def finish page_builder.finish end |
#init ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/embulk/filter/ruby_proc.rb', line 39 def init task["requires"].each do |lib| require lib end evaluator_binding = Evaluator.new(task["variables"]).get_binding @procs = Hash[task["columns"].map {|col| if col["proc"] [col["name"], eval(col["proc"], evaluator_binding)] else [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.(col["proc_file"]))] end }] @row_procs = task["rows"].map {|rowdef| if rowdef["proc"] eval(rowdef["proc"], evaluator_binding) else eval(File.read(rowdef["proc_file"]), evaluator_binding, File.(rowdef["proc_file"])) end }.compact raise "Need columns or rows parameter" if @row_procs.empty? && @procs.empty? @skip_nils = Hash[task["columns"].map {|col| [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]] }] end |