Class: Embulk::Filter::EvalFilterPlugin

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/eval.rb

Defined Under Namespace

Classes: NotFoundOutColumn

Constant Summary collapse

VERSION =
'0.1.0'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.out_schema(out_columns, in_schema) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/embulk/filter/eval.rb', line 22

def self.out_schema(out_columns, in_schema)
  schema = out_columns.map.with_index do |name, i|
    sch = in_schema.find { |sch| sch.name == name }

    unless sch
      raise NotFoundOutSchema, "Not found output schema: `#{name}'"
    end

    Embulk::Column.new(index: i, name: sch.name, type: sch.type, format: sch.format)
  end
  schema.empty? ? in_schema : schema
end

.transaction(config, in_schema) {|task, out_schema| ... } ⇒ Object

Yields:



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/embulk/filter/eval.rb', line 10

def self.transaction(config, in_schema, &control)
  # configuration code:
  task = {
    "eval_columns" => config.param("eval_columns", :array, default: []),
    "out_columns" => config.param("out_columns", :array, default: [])
  }

  out_schema = out_schema(task['out_columns'], in_schema)

  yield(task, out_schema)
end

Instance Method Details

#add(page) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/embulk/filter/eval.rb', line 42

def add(page)
  page.each do |record|
    begin
      record = hash_record(record)

      result = {}

      record.each do |key, value|
        source = @table.find do |t|
          t.key?(key)
        end

        if source && source = source[key]
          result[key] = eval(source)
        else
          result[key] = value
        end
      end

      page_builder.add(result.values)
    rescue
    end
  end
end

#closeObject



39
40
# File 'lib/embulk/filter/eval.rb', line 39

def close
end

#finishObject



67
68
69
# File 'lib/embulk/filter/eval.rb', line 67

def finish
  page_builder.finish
end

#hash_record(record) ⇒ Object



71
72
73
# File 'lib/embulk/filter/eval.rb', line 71

def hash_record(record)
  Hash[in_schema.names.zip(record)]
end

#initObject



35
36
37
# File 'lib/embulk/filter/eval.rb', line 35

def init
  @table = task["eval_columns"]
end