Class: Embulk::Filter::Insert
- Inherits:
-
FilterPlugin
- Object
- FilterPlugin
- Embulk::Filter::Insert
- Defined in:
- lib/embulk/filter/insert.rb
Class Method Summary collapse
-
.get_column(column_hash) ⇒ Object
return { :name => name1, :value => value1, :type => type1 }.
-
.get_columns(columns_array) ⇒ Object
return array of column.
- .transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object
Instance Method Summary collapse
Class Method Details
.get_column(column_hash) ⇒ Object
return { :name => name1, :value => value1, :type => type1 }
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/embulk/filter/insert.rb', line 79 def self.get_column(column_hash) if column_hash.size > 2 raise ArgumentError, "Invalid column parameter: #{column_hash.to_s}" end # default type is string type = :string if column_hash.size == 2 unless column_hash.keys.include?("as") raise ArgumentError, "Invalid column parameter: #{column_hash.to_s}" end type = column_hash["as"].to_sym column_hash = column_hash.select{|k, v| k != "as" } end column = { :name => column_hash.keys.first, :value => column_hash.values.first, :type => type } # In the following case, this plugin inserts null # e.g., { type: insert, column: { user_id: null, as: long } } return column if column[:value].nil? case type when :boolean column[:value] = (column[:value] != "false") when :long column[:value] = column[:value].to_i when :double column[:value] = column[:value].to_f when :string # do nothing when :timestamp column[:value] = Date.parse(column[:value]) when :json column[:value] = JSON.parse(column[:value]) else raise ArgumentError, "Unknown type #{type}: supported types are boolean, long, double, string, timestamp and json" end column end |
.get_columns(columns_array) ⇒ Object
return array of column
126 127 128 |
# File 'lib/embulk/filter/insert.rb', line 126 def self.get_columns(columns_array) columns_array.map{|column_hash| Insert.get_column(column_hash) } end |
.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/embulk/filter/insert.rb', line 8 def self.transaction(config, in_schema, &control) task = {} column = config.param("column", :hash, default: nil) columns = config.param("columns", :array, default: nil) # ^ = XOR unless (column.nil? ^ columns.nil?) raise ArgumentError, "Either \"column\" or \"columns\" is needed" end if column columns = [ Insert.get_column(column) ] else columns = Insert.get_columns(columns) end task["values"] = columns.map{|c| c[:value] } at = config.param("at", :string, default: nil) before = config.param("before", :string, default: nil) after = config.param("after", :string, default: nil) if at.nil? and before.nil? and after.nil? at = "bottom" end no_of_position_param = 0 no_of_position_param += 1 unless at.nil? no_of_position_param += 1 unless before.nil? no_of_position_param += 1 unless after.nil? unless no_of_position_param == 1 raise ArgumentError, "Either \"at\", \"before\" or \"after\" is needed" end if at case at when "top", "head" task["position"] = 0 when "bottom", "tail" task["position"] = in_schema.size else task["position"] = at.to_i end elsif before schema_cols = in_schema.select{|c| c.name == before } if schema_cols.empty? raise ArgumentError, "Column #{before} is not found" end task["position"] = schema_cols[0].index else schema_cols = in_schema.select{|c| c.name == after } if schema_cols.empty? raise ArgumentError, "Column #{after} is not found" end task["position"] = schema_cols[0].index + 1 end # modify column definition inserted_schema = [] columns.each{|c| inserted_schema.push(Column.new(0, c[:name], c[:type])) } out_columns = in_schema.map{|c| c } out_columns.insert(task["position"], *inserted_schema) # renumber index out_columns.each_with_index{|c, idx| c.index = idx } yield(task, out_columns) end |
Instance Method Details
#add(page) ⇒ Object
138 139 140 141 142 143 |
# File 'lib/embulk/filter/insert.rb', line 138 def add(page) page.each do |record| record.insert(@position, *@values) page_builder.add(record) end end |
#close ⇒ Object
135 136 |
# File 'lib/embulk/filter/insert.rb', line 135 def close end |
#finish ⇒ Object
145 146 147 |
# File 'lib/embulk/filter/insert.rb', line 145 def finish page_builder.finish end |
#init ⇒ Object
130 131 132 133 |
# File 'lib/embulk/filter/insert.rb', line 130 def init @values = task["values"] @position = task["position"] end |