Class: Embulk::Filter::Insert

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/insert.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.get_column(column_hash) ⇒ Object

return { :name => name1, :value => value1, :type => type1 }



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/embulk/filter/insert.rb', line 79

def self.get_column(column_hash)
  if column_hash.size > 2
    raise ArgumentError, "Invalid column parameter: #{column_hash.to_s}"
  end

  # default type is string
  type = :string

  if column_hash.size == 2
    unless column_hash.keys.include?("as")
      raise ArgumentError, "Invalid column parameter: #{column_hash.to_s}"
    end
    type = column_hash["as"].to_sym
    column_hash = column_hash.select{|k, v| k != "as" }
  end

  column = {
    :name => column_hash.keys.first,
    :value => column_hash.values.first,
    :type => type
  }

  # In the following case, this plugin inserts null
  # e.g., { type: insert, column: { user_id: null, as: long } }
  return column if column[:value].nil?

  case type
  when :boolean
    column[:value] = (column[:value] != "false")
  when :long
    column[:value] = column[:value].to_i
  when :double
    column[:value] = column[:value].to_f
  when :string
    # do nothing
  when :timestamp
    column[:value] = Date.parse(column[:value])
  when :json
    column[:value] = JSON.parse(column[:value])
  else
    raise ArgumentError, "Unknown type #{type}: supported types are boolean, long, double, string, timestamp and json"
  end

  column
end

.get_columns(columns_array) ⇒ Object

return array of column



126
127
128
# File 'lib/embulk/filter/insert.rb', line 126

def self.get_columns(columns_array)
  columns_array.map{|column_hash| Insert.get_column(column_hash) }
end

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/embulk/filter/insert.rb', line 8

def self.transaction(config, in_schema, &control)
  task = {}

  column = config.param("column", :hash, default: nil)
  columns = config.param("columns", :array, default: nil)
  # ^ = XOR
  unless (column.nil? ^ columns.nil?)
    raise ArgumentError, "Either \"column\" or \"columns\" is needed"
  end

  if column
    columns = [ Insert.get_column(column) ]
  else
    columns = Insert.get_columns(columns)
  end

  task["values"] = columns.map{|c| c[:value] }

  at = config.param("at", :string, default: nil)
  before = config.param("before", :string, default: nil)
  after = config.param("after", :string, default: nil)

  if at.nil? and before.nil? and after.nil?
    at = "bottom"
  end

  no_of_position_param = 0
  no_of_position_param += 1 unless at.nil?
  no_of_position_param += 1 unless before.nil?
  no_of_position_param += 1 unless after.nil?

  unless no_of_position_param == 1
    raise ArgumentError, "Either \"at\", \"before\" or \"after\" is needed"
  end

  if at
    case at
    when "top", "head"
      task["position"] = 0
    when "bottom", "tail"
      task["position"] = in_schema.size
    else
      task["position"] = at.to_i
    end
  elsif before
    schema_cols = in_schema.select{|c| c.name == before }
    if schema_cols.empty?
      raise ArgumentError, "Column #{before} is not found"
    end
    task["position"] = schema_cols[0].index
  else
    schema_cols = in_schema.select{|c| c.name == after }
    if schema_cols.empty?
      raise ArgumentError, "Column #{after} is not found"
    end
    task["position"] = schema_cols[0].index + 1
  end

  # modify column definition
  inserted_schema = []
  columns.each{|c| inserted_schema.push(Column.new(0, c[:name], c[:type])) }
  out_columns = in_schema.map{|c| c }
  out_columns.insert(task["position"], *inserted_schema)

  # renumber index
  out_columns.each_with_index{|c, idx| c.index = idx }

  yield(task, out_columns)
end

Instance Method Details

#add(page) ⇒ Object



138
139
140
141
142
143
# File 'lib/embulk/filter/insert.rb', line 138

def add(page)
  page.each do |record|
    record.insert(@position, *@values)
    page_builder.add(record)
  end
end

#closeObject



135
136
# File 'lib/embulk/filter/insert.rb', line 135

def close
end

#finishObject



145
146
147
# File 'lib/embulk/filter/insert.rb', line 145

def finish
  page_builder.finish
end

#initObject



130
131
132
133
# File 'lib/embulk/filter/insert.rb', line 130

def init
  @values = task["values"]
  @position = task["position"]
end