Class: Embulk::Filter::RubyProc

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/ruby_proc.rb

Defined Under Namespace

Classes: Evaluator

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.parse_col_procs(columns, evaluator_binding) ⇒ Object



129
130
131
132
133
134
135
136
137
# File 'lib/embulk/filter/ruby_proc.rb', line 129

def self.parse_col_procs(columns, evaluator_binding)
  Hash[columns.map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
end

.parse_row_procs(rows, evaluator_binding) ⇒ Object



139
140
141
142
143
144
145
146
147
# File 'lib/embulk/filter/ruby_proc.rb', line 139

def self.parse_row_procs(rows, evaluator_binding)
  rows.map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
end

.proc_storeObject



117
118
119
# File 'lib/embulk/filter/ruby_proc.rb', line 117

def self.proc_store
  @proc_store
end

.row_proc_storeObject



121
122
123
# File 'lib/embulk/filter/ruby_proc.rb', line 121

def self.row_proc_store
  @row_proc_store
end

.skip_row_proc_storeObject



125
126
127
# File 'lib/embulk/filter/ruby_proc.rb', line 125

def self.skip_row_proc_store
  @skip_row_proc_store
end

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/embulk/filter/ruby_proc.rb', line 32

def self.transaction(config, in_schema, &control)
  task = {
    "columns" => config.param("columns", :array, default: []),
    "rows" => config.param("rows", :array, default: []),
    "skip_rows" => config.param("skip_rows", :array, default: []),
    "before" => config.param("before", :array, default: []),
    "after" => config.param("after", :array, default: []),
    "requires" => config.param("requires", :array, default: []),
    "variables" => config.param("variables", :hash, default: {}),
  }

  out_columns = in_schema.map do |col|
    target = task["columns"].find { |filter_col| filter_col["name"] == col.name }
    if target
      type = target["type"] ? target["type"].to_sym : col.type
      Embulk::Column.new(index: col.index, name: col.name, type: type || col.type, format: target["format"] || col.format)
    else
      col
    end
  end

  task["requires"].each do |lib|
    require lib
  end

  @proc_store ||= {}
  @row_proc_store ||= {}
  @skip_row_proc_store ||= {}
  transaction_id = rand(100000000)
  until !@proc_store.has_key?(transaction_id)
    transaction_id = rand(100000000)
  end
  evaluator_binding = Evaluator.new(task["variables"]).get_binding

  # In order to avoid multithread probrem, initialize procs here
  before_procs = task["before"].map {|before|
    if before["proc"]
      eval(before["proc"], evaluator_binding)
    else
      eval(File.read(before["proc_file"]), evaluator_binding, File.expand_path(before["proc_file"]))
    end
  }
  @proc_store[transaction_id] = procs = Hash[task["columns"].map {|col|
    if col["proc"]
      [col["name"], eval(col["proc"], evaluator_binding)]
    else
      [col["name"], eval(File.read(col["proc_file"]), evaluator_binding, File.expand_path(col["proc_file"]))]
    end
  }]
  @row_proc_store[transaction_id] = row_procs = task["rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  @skip_row_proc_store[transaction_id] = skip_row_procs = task["skip_rows"].map {|rowdef|
    if rowdef["proc"]
      eval(rowdef["proc"], evaluator_binding)
    else
      eval(File.read(rowdef["proc_file"]), evaluator_binding, File.expand_path(rowdef["proc_file"]))
    end
  }.compact
  task["transaction_id"] = transaction_id
  raise "Need columns or rows parameter" if procs.empty? && row_procs.empty? && skip_row_procs.empty?

  before_procs.each do |pr|
    pr.call
  end

  yield(task, out_columns)

  after_procs = task["after"].map {|after|
    if after["proc"]
      eval(after["proc"], evaluator_binding)
    else
      eval(File.read(after["proc_file"]), evaluator_binding, File.expand_path(after["proc_file"]))
    end
  }

  after_procs.each do |pr|
    pr.call
  end
end

Instance Method Details

#add(page) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/embulk/filter/ruby_proc.rb', line 172

def add(page)
  page.each do |record|
    if row_procs.empty?
      record_hashes = [hashrize(record)]
    else
      record_hashes = row_procs.each_with_object([]) do |pr, arr|
        catch :skip_record do
          result = pr.call(hashrize(record))
          case result
          when Array
            result.each do |r|
              arr << r
            end
          when Hash
            arr << result
          else
            raise "row proc return value must be a Array or Hash"
          end
        end
      end
    end

    record_hashes.each do |record_hash|
      catch :skip_record do
        skip_row_procs.each do |pr|
          throw :skip_record if pr.call(record_hash)
        end

        procs.each do |col, pr|
          next unless record_hash.has_key?(col)
          next if record_hash[col].nil? && skip_nils[col]

          if pr.arity == 1
            record_hash[col] = pr.call(record_hash[col])
          else
            record_hash[col] = pr.call(record_hash[col], record_hash)
          end
        end
        page_builder.add(record_hash.values)
      end
    end
  end
end

#closeObject



169
170
# File 'lib/embulk/filter/ruby_proc.rb', line 169

def close
end

#finishObject



216
217
218
# File 'lib/embulk/filter/ruby_proc.rb', line 216

def finish
  page_builder.finish
end

#initObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/embulk/filter/ruby_proc.rb', line 149

def init
  task["requires"].each do |lib|
    require lib
  end

  if self.class.proc_store.nil? || self.class.row_proc_store.nil? || self.class.skip_row_proc_store.nil?
    evaluator_binding = Evaluator.new(task["variables"]).get_binding
    @procs = self.class.parse_col_procs(task["columns"], evaluator_binding)
    @row_procs = self.class.parse_row_procs(task["rows"], evaluator_binding)
    @skip_row_procs = self.class.parse_row_procs(task["skip_rows"], evaluator_binding)
  else
    @procs = self.class.proc_store[task["transaction_id"]]
    @row_procs = self.class.row_proc_store[task["transaction_id"]]
    @skip_row_procs = self.class.skip_row_proc_store[task["transaction_id"]]
  end
  @skip_nils = Hash[task["columns"].map {|col|
    [col["name"], col["skip_nil"].nil? ? true : !!col["skip_nil"]]
  }]
end