Class: Embulk::Filter::RubyProc

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/ruby_proc.rb

Defined Under Namespace

Classes: Evaluator

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.page_proc_storeObject



136
137
138
# File 'lib/embulk/filter/ruby_proc.rb', line 136

def self.page_proc_store
  @page_proc_store
end

.parse_col_procs(columns, evaluator_binding) ⇒ Object



144
145
146
147
148
149
150
151
152
# File 'lib/embulk/filter/ruby_proc.rb', line 144

def self.parse_col_procs(columns, evaluator_binding)
  Hash[columns.map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
end

.parse_page_procs(pages, evaluator_binding) ⇒ Object



164
165
166
167
168
169
170
171
172
# File 'lib/embulk/filter/ruby_proc.rb', line 164

def self.parse_page_procs(pages, evaluator_binding)
  pages.map {|page|
    if page["proc"]
      eval(page["proc"], evaluator_binding)
    else
      eval(File.read(page["proc_file"]), evaluator_binding, File.expand_path(page["proc_file"]))
    end
  }.compact
end

.parse_row_procs(rows, evaluator_binding) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/embulk/filter/ruby_proc.rb', line 154

def self.parse_row_procs(rows, evaluator_binding)
  rows.map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
end

.proc_storeObject



128
129
130
# File 'lib/embulk/filter/ruby_proc.rb', line 128

def self.proc_store
  @proc_store
end

.row_proc_storeObject



132
133
134
# File 'lib/embulk/filter/ruby_proc.rb', line 132

def self.row_proc_store
  @row_proc_store
end

.skip_row_proc_storeObject



140
141
142
# File 'lib/embulk/filter/ruby_proc.rb', line 140

def self.skip_row_proc_store
  @skip_row_proc_store
end

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/embulk/filter/ruby_proc.rb', line 32

def self.transaction(config, in_schema, &control)
  task = {
    "columns" => config.param("columns", :array, default: []),
    "rows" => config.param("rows", :array, default: []),
    "pages" => config.param("pages", :array, default: []),
    "skip_rows" => config.param("skip_rows", :array, default: []),
    "before" => config.param("before", :array, default: []),
    "after" => config.param("after", :array, default: []),
    "requires" => config.param("requires", :array, default: []),
    "variables" => config.param("variables", :hash, default: {}),
  }

  out_columns = in_schema.map do |col|
    target = task["columns"].find { |filter_col| filter_col["name"] == col.name }
    if target
      type = target["type"] ? target["type"].to_sym : col.type
      Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format)
    else
      col
    end
  end

  task["requires"].each do |lib|
    require lib
  end

  @proc_store ||= {}
  @row_proc_store ||= {}
  @page_proc_store ||= {}
  @skip_row_proc_store ||= {}
  transaction_id = rand(100000000)
  until !@proc_store.has_key?(transaction_id)
    transaction_id = rand(100000000)
  end
  evaluator_binding = Evaluator.new(task["variables"]).get_binding

  # In order to avoid multithread probrem, initialize procs here
  before_procs = task["before"].map {|before|
    if before["proc"]
      eval(before["proc"], evaluator_binding)
    else
      eval(File.read(before["proc_file"]), evaluator_binding, File.expand_path(before["proc_file"]))
    end
  }
  @proc_store[transaction_id] = procs = Hash[task["columns"].map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
  @row_proc_store[transaction_id] = row_procs = task["rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  @page_proc_store[transaction_id] = page_procs = task["pages"].map {|page|
    if page["proc"]
      eval(page["proc"], evaluator_binding)
    else
      eval(File.read(page["proc_file"]), evaluator_binding, File.expand_path(page["proc_file"]))
    end
  }.compact
  @skip_row_proc_store[transaction_id] = skip_row_procs = task["skip_rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  task["transaction_id"] = transaction_id
  if procs.empty? && row_procs.empty? && page_procs.empty? && skip_row_procs.empty?
    raise "Need columns or rows or pages parameter"
  end

  before_procs.each do |pr|
    pr.call
  end

  yield(task, out_columns)

  after_procs = task["after"].map {|after|
    if after["proc"]
      eval(after["proc"], evaluator_binding)
    else
      eval(File.read(after["proc_file"]), evaluator_binding, File.expand_path(after["proc_file"]))
    end
  }

  after_procs.each do |pr|
    pr.call
  end
end

Instance Method Details

#add(page) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/embulk/filter/ruby_proc.rb', line 199

def add(page)
  proc_records = []
  page.each do |record|
    if row_procs.empty?
      record_hashes = [hashrize(record)]
    else
      record_hashes = row_procs.each_with_object([]) do |pr, arr|
        catch :skip_record do
          result = pr.call(hashrize(record))
          case result
          when Array
            result.each do |r|
              arr << r
            end
          when Hash
            arr << result
          else
            raise "row proc return value must be a Array or Hash"
          end
        end
      end
    end

    record_hashes.each do |record_hash|
      catch :skip_record do
        skip_row_procs.each do |pr|
          throw :skip_record if pr.call(record_hash)
        end

        procs.each do |col, pr|
          next unless record_hash.has_key?(col)
          next if record_hash[col].nil? && skip_nils[col]

          if pr.arity == 1
            record_hash[col] = pr.call(record_hash[col])
          else
            record_hash[col] = pr.call(record_hash[col], record_hash)
          end
        end
        if page_procs.empty?
          page_builder.add(record_hash.values)
        else
          proc_records << record_hash
        end
      end
    end
  end

  unless page_procs.empty?
    page_procs.each do |pr|
      result = pr.call(proc_records)
      result.each { |record| page_builder.add(record.values) }
    end
  end
end

#closeObject



196
197
# File 'lib/embulk/filter/ruby_proc.rb', line 196

def close
end

#finishObject



255
256
257
# File 'lib/embulk/filter/ruby_proc.rb', line 255

def finish
  page_builder.finish
end

#initObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/embulk/filter/ruby_proc.rb', line 174

def init
  task["requires"].each do |lib|
    require lib
  end

  if self.class.proc_store.nil? || self.class.row_proc_store.nil? || self.class.page_proc_store.nil? || self.class.skip_row_proc_store.nil?
    evaluator_binding = Evaluator.new(task["variables"]).get_binding
    @procs = self.class.parse_col_procs(task["columns"], evaluator_binding)
    @row_procs = self.class.parse_row_procs(task["rows"], evaluator_binding)
    @page_procs = self.class.parse_page_procs(task["pages"], evaluator_binding)
    @skip_row_procs = self.class.parse_row_procs(task["skip_rows"], evaluator_binding)
  else
    @procs = self.class.proc_store[task["transaction_id"]]
    @row_procs = self.class.row_proc_store[task["transaction_id"]]
    @page_procs = self.class.page_proc_store[task["transaction_id"]]
    @skip_row_procs = self.class.skip_row_proc_store[task["transaction_id"]]
  end
  @skip_nils = Hash[task["columns"].map {|col|
    [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]]
  }]
end