Class: Embulk::Filter::Unpivot

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/unpivot.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/embulk/filter/unpivot.rb', line 10

def self.transaction(config, in_schema, &control)

  task = {
    "outer_key"  => config.param("outer_key", :string),
    "inner_key"  => config.param("inner_key", :string),
    "columns"    => config.param("columns", :array, default: []),
    "additional" => config.param("additional", :array, default: []),
  }

  out_columns = [
    Column.new(nil, task["outer_key"], :string),
    Column.new(nil, task["inner_key"], :string),
  ]

  task["additional"].each do |name|
    col = in_schema.find { |sch| sch.name == name }
    col.index = nil
    out_columns.push(col)
  end

  yield(task, out_columns)
end

Instance Method Details

#add(page) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/embulk/filter/unpivot.rb', line 49

def add(page)
  page.each do |record|
    new_record = []
    new_record.push(record[@idx_outer])
    new_record.push(record[@idx_inner])

    task["additional"].each do |name|
      new_record.push(record[@indexes[name]])
    end

    task["columns"].each do |target|
      index = @indexes[target["name"]]
      next if record[index] == 0 or record[index] == ''
      new_record[1] = target["id"]
      page_builder.add(new_record)
    end
  end
end

#closeObject



46
47
# File 'lib/embulk/filter/unpivot.rb', line 46

def close
end

#finishObject



68
69
70
# File 'lib/embulk/filter/unpivot.rb', line 68

def finish
  page_builder.finish
end

#initObject



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/embulk/filter/unpivot.rb', line 33

def init
  # initialization code:
  @idx_outer = get_index(task["outer_key"], in_schema)
  @idx_inner = get_index(task["inner_key"], in_schema)
  @indexes = {}
  task["additional"].each do |name|
    @indexes[name] = get_index(name, in_schema)
  end
  task["columns"].each do |target|
    @indexes[target["name"]] = get_index(target["name"], in_schema)
  end
end