Class: Embulk::Filter::RubyProc

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/ruby_proc.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/embulk/filter/ruby_proc.rb', line 18

def self.transaction(config, in_schema, &control)
  task = {
    "columns" => config.param("columns", :array, default: []),
    "rows" => config.param("rows", :array, default: []),
    "requires" => config.param("requires", :array, default: []),
    "variables" => config.param("variables", :hash, default: {}),
  }

  out_columns = in_schema.map do |col|
    target = task["columns"].find { |filter_col| filter_col["name"] == col.name }
    if target
      type = target["type"] ? target["type"].to_sym : col.type
      Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format)
    else
      col
    end
  end

  yield(task, out_columns)
end

Instance Method Details

#add(page) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/embulk/filter/ruby_proc.rb', line 70

def add(page)
  page.each do |record|
    if @row_procs.empty?
      record_hashes = [hashrize(record)]
    else
      record_hashes = @row_procs.flat_map do |pr|
        pr.call(hashrize(record))
      end
    end

    record_hashes.each do |record_hash|
      @procs.each do |col, pr|
        next unless record_hash.has_key?(col)
        next if record_hash[col].nil? && @skip_nils[col]

        if pr.arity == 1
          record_hash[col] = pr.call(record_hash[col])
        else
          record_hash[col] = pr.call(record_hash[col], record_hash)
        end
      end
      page_builder.add(record_hash.values)
    end
  end
end

#closeObject



67
68
# File 'lib/embulk/filter/ruby_proc.rb', line 67

def close
end

#finishObject



96
97
98
# File 'lib/embulk/filter/ruby_proc.rb', line 96

def finish
  page_builder.finish
end

#initObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/embulk/filter/ruby_proc.rb', line 39

def init
  task["requires"].each do |lib|
    require lib
  end

  evaluator_binding = Evaluator.new(task["variables"]).get_binding

  @procs = Hash[task["columns"].map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
  @row_procs = task["rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  raise "Need columns or rows parameter" if @row_procs.empty? && @procs.empty?

  @skip_nils = Hash[task["columns"].map {|col|
    [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]]
  }]
end