Class: Embulk::Filter::RubyProc
- Inherits:
-
FilterPlugin
- Object
- FilterPlugin
- Embulk::Filter::RubyProc
- Defined in:
- lib/embulk/filter/ruby_proc.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/embulk/filter/ruby_proc.rb', line 7 def self.transaction(config, in_schema, &control) task = { "columns" => config.param("columns", :array), "requires" => config.param("requires", :array, default: []), } out_columns = in_schema.map do |col| target = task["columns"].find { |filter_col| filter_col["name"] == col.name } if target type = target["type"] ? target["type"].to_sym : col.type Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format) else col end end yield(task, out_columns) end |
Instance Method Details
#add(page) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/embulk/filter/ruby_proc.rb', line 41 def add(page) page.each do |record| record_hash = hashrize(record) @procs.each do |col, pr| next unless record_hash.has_key?(col) next if record_hash[col].nil? && @skip_nils[col] if pr.arity == 1 record_hash[col] = pr.call(record_hash[col]) else record_hash[col] = pr.call(record_hash[col], record_hash) end end page_builder.add(record_hash.values) end end |
#close ⇒ Object
38 39 |
# File 'lib/embulk/filter/ruby_proc.rb', line 38 def close end |
#finish ⇒ Object
58 59 60 |
# File 'lib/embulk/filter/ruby_proc.rb', line 58 def finish page_builder.finish end |
#init ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/embulk/filter/ruby_proc.rb', line 26 def init task["requires"].each do |lib| require lib end @procs = Hash[task["columns"].map {|col| [col["name"], eval(col["proc"])] }] @skip_nils = Hash[task["columns"].map {|col| [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]] }] end |