Class: Embulk::Filter::RubyProc

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/ruby_proc.rb

Defined Under Namespace

Classes: Evaluator

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.parse_col_procs(columns, evaluator_binding) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/embulk/filter/ruby_proc.rb', line 116

def self.parse_col_procs(columns, evaluator_binding)
  Hash[columns.map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
end

.parse_row_procs(rows, evaluator_binding) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/embulk/filter/ruby_proc.rb', line 126

def self.parse_row_procs(rows, evaluator_binding)
  rows.map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
end

.proc_storeObject



108
109
110
# File 'lib/embulk/filter/ruby_proc.rb', line 108

def self.proc_store
  @proc_store
end

.row_proc_storeObject



112
113
114
# File 'lib/embulk/filter/ruby_proc.rb', line 112

def self.row_proc_store
  @row_proc_store
end

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/embulk/filter/ruby_proc.rb', line 32

def self.transaction(config, in_schema, &control)
  task = {
    "columns" => config.param("columns", :array, default: []),
    "rows" => config.param("rows", :array, default: []),
    "before" => config.param("before", :array, default: []),
    "after" => config.param("after", :array, default: []),
    "requires" => config.param("requires", :array, default: []),
    "variables" => config.param("variables", :hash, default: {}),
  }

  out_columns = in_schema.map do |col|
    target = task["columns"].find { |filter_col| filter_col["name"] == col.name }
    if target
      type = target["type"] ? target["type"].to_sym : col.type
      Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format)
    else
      col
    end
  end

  task["requires"].each do |lib|
    require lib
  end

  @proc_store ||= {}
  @row_proc_store ||= {}
  transaction_id = rand(100000000)
  until !@proc_store.has_key?(transaction_id)
    transaction_id = rand(100000000)
  end
  evaluator_binding = Evaluator.new(task["variables"]).get_binding

  # In order to avoid multithread probrem, initialize procs here
  before_procs = task["before"].map {|before|
    if before["proc"]
      eval(before["proc"], evaluator_binding)
    else
      eval(File.read(before["proc_file"]), evaluator_binding, File.expand_path(before["proc_file"]))
    end
  }
  @proc_store[transaction_id] = procs = Hash[task["columns"].map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
  @row_proc_store[transaction_id] = row_procs = task["rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  task["transaction_id"] = transaction_id
  raise "Need columns or rows parameter" if procs.empty? && row_procs.empty?

  before_procs.each do |pr|
    pr.call
  end

  yield(task, out_columns)

  after_procs = task["after"].map {|after|
    if after["proc"]
      eval(after["proc"], evaluator_binding)
    else
      eval(File.read(after["proc_file"]), evaluator_binding, File.expand_path(after["proc_file"]))
    end
  }

  after_procs.each do |pr|
    pr.call
  end
end

Instance Method Details

#add(page) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/embulk/filter/ruby_proc.rb', line 157

def add(page)
  page.each do |record|
    if row_procs.empty?
      record_hashes = [hashrize(record)]
    else
      record_hashes = row_procs.each_with_object([]) do |pr, arr|
        result = pr.call(hashrize(record))
        case result
        when Array
          result.each do |r|
            arr << r
          end
        when Hash
          arr << result
        else
          raise "row proc return value must be a Array or Hash"
        end
      end
    end

    record_hashes.each do |record_hash|
      procs.each do |col, pr|
        next unless record_hash.has_key?(col)
        next if record_hash[col].nil? && skip_nils[col]

        if pr.arity == 1
          record_hash[col] = pr.call(record_hash[col])
        else
          record_hash[col] = pr.call(record_hash[col], record_hash)
        end
      end
      page_builder.add(record_hash.values)
    end
  end
end

#closeObject



154
155
# File 'lib/embulk/filter/ruby_proc.rb', line 154

def close
end

#finishObject



193
194
195
# File 'lib/embulk/filter/ruby_proc.rb', line 193

def finish
  page_builder.finish
end

#initObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/embulk/filter/ruby_proc.rb', line 136

def init
  task["requires"].each do |lib|
    require lib
  end

  if self.class.proc_store.nil? || self.class.row_proc_store.nil?
    evaluator_binding = Evaluator.new(task["variables"]).get_binding
    @procs = self.class.parse_col_procs(task["columns"], evaluator_binding)
    @row_procs = self.class.parse_row_procs(task["rows"], evaluator_binding)
  else
    @procs = self.class.proc_store[task["transaction_id"]]
    @row_procs = self.class.row_proc_store[task["transaction_id"]]
  end
  @skip_nils = Hash[task["columns"].map {|col|
    [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]]
  }]
end