Module: BulkMethodsMixin

Defined in:
lib/bulk_data_methods/bulk_methods_mixin.rb

Overview

MixIn used to extend ActiveRecord::Base classes implementing bulk insert and update operations through #create_many and #update_many.

Examples:

to use:

class Company < ActiveRecord::Base
  extend BulkMethodsMixin
end

Defined Under Namespace

Classes: BulkUploadDataInconsistent

Instance Method Summary collapse

Instance Method Details

#create_many(rows, options = {}) ⇒ Array<Hash>

BULK creation of many rows

Examples:

no options used

rows = [
      { :name => 'Keith', :salary => 1000 },
      { :name => 'Alex', :salary => 2000 }
]
Employee.create_many(rows)

with :returning option to returns key value

rows = [
      { :name => 'Keith', :salary => 1000 },
      { :name => 'Alex', :salary => 2000 }
]
options = { :returning => [:id] }
Employee.create_many(rows, options)
[#<Employee id: 1>, #<Employee id: 2>]

with :slice_size option (will generate two insert queries)

rows = [
      { :name => 'Keith', :salary => 1000 },
      { :name => 'Alex', :salary => 2000 },
      { :name => 'Mark', :salary => 3000 }
]
options = { :slice_size => 2 }
Employee.create_many(rows, options)

Parameters:

  • rows (Array<Hash>)

    ([]) data to be inserted into database

  • options (Hash) (defaults to: {})

    ({}) options for bulk inserts

Options Hash (options):

  • :slice_size (Integer) — default: 1000

    how many records will be created in a single SQL query

  • :check_consitency (Boolean) — default: true

    ensure some modicum of sanity on the incoming dataset, specifically: does each row define the same set of key/value pairs

  • :returning (Array or String) — default: nil

    list of fields to return.

Returns:

  • (Array<Hash>)

    rows returned from DB as option requests

Raises:

  • (BulkUploadDataInconsistent)

    raised when key/value pairs between rows are inconsistent (check disabled with option :check_consistency)



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/bulk_data_methods/bulk_methods_mixin.rb', line 50

def create_many(rows, options = {})
  return [] if rows.blank?
  options[:slice_size] = 1000 unless options.has_key?(:slice_size)
  options[:check_consistency] = true unless options.has_key?(:check_consistency)
  returning_clause = ""
  if options[:returning]
    if options[:returning].is_a? Array
      returning_list = options[:returning].join(',')
    else
      returning_list = options[:returning]
    end
    returning_clause = " returning #{returning_list}"
  end
  returning = []

  created_at_value = Time.zone.now

  num_sequences_needed = rows.reject{|r| r[:id].present?}.length
  if num_sequences_needed > 0
    row_ids = connection.next_sequence_values(sequence_name, num_sequences_needed)
  else
    row_ids = []
  end
  rows.each do |row|
    # set the primary key if it needs to be set
    row[:id] ||= row_ids.shift
  end.each do |row|
    # set :created_at if need be
    row[:created_at] ||= created_at_value
  end.group_by do |row|
    respond_to?(:partition_table_name) ? partition_table_name(*partition_key_values(row)) : table_name
  end.each do |table_name, rows_for_table|
    column_names = rows_for_table[0].keys.sort{|a,b| a.to_s <=> b.to_s}
    sql_insert_string = "insert into #{table_name} (#{column_names.join(',')}) values "
    rows_for_table.map do |row|
      if options[:check_consistency]
        row_column_names = row.keys.sort{|a,b| a.to_s <=> b.to_s}
        if column_names != row_column_names
          raise BulkUploadDataInconsistent.new(self, table_name, column_names, row_column_names, "while attempting to build insert statement")
        end
      end
      column_values = column_names.map do |column_name|
        quote_value(row[column_name], columns_hash[column_name.to_s])
      end.join(',')
      "(#{column_values})"
    end.each_slice(options[:slice_size]) do |insert_slice|
      returning += find_by_sql(sql_insert_string + insert_slice.join(',') + returning_clause)
    end
  end
  return returning
end

#update_many(rows = [], options = {}) ⇒ Array<Hash> #update_many(rows = {}, options = {}) ⇒ Array<Hash>

Note:

Remember that you should probably set updated_at using “updated = datatable.updated_at” or “updated_at = now()” in the set_array if you want to follow the standard active record model for time columns (and you have an updated_at column)

BULK updates of many rows

Examples:

using “set_array” to add the value of “salary” to the specific employee’s salary the default where clause matches IDs so, it works here.

rows = [
  { :id => 1, :salary => 1000 },
  { :id => 10, :salary => 2000 },
  { :id => 23, :salary => 2500 }
]
options = { :set_array => '"salary = datatable.salary"' }
Employee.update_many(rows, options)

using where_datatable clause to match salary.

rows = [
  { :id => 1, :salary => 1000, :company_id => 10 },
  { :id => 10, :salary => 2000, :company_id => 12 },
  { :id => 23, :salary => 2500, :company_id => 5 }
]
options = {
  :set_array => '"salary = datatable.salary"',
  :where_constraint => '"#{table_name}.salary <> datatable.salary"'
}
Employee.update_many(rows, options)

using where_constraint clause to only update salary for active employees

rows = [
  { :id => 1, :salary => 1000, :company_id => 10 },
  { :id => 10, :salary => 2000, :company_id => 12 },
  { :id => 23, :salary => 2500, :company_id => 5 }
]
options = {
  :set_array => '"salary = datatable.salary"',
  :where_constraint => '"#{table_name}.active = true"'
}
Employee.update_many(rows, options)

setting where clause to the KEY of the hash passed in and the set_array is generated from the VALUES

rows = {
  { :id => 1 } => { :salary => 100000, :company_id => 10 },
  { :id => 10 } => { :salary => 110000, :company_id => 12 },
  { :id => 23 } => { :salary => 90000, :company_id => 5 }
}
Employee.update_many(rows)

Overloads:

  • #update_many(rows = [], options = {}) ⇒ Array<Hash>

    Parameters:

    • rows (Array<Hash>) (defaults to: [])

      ([]) data to be updated

    Options Hash (options):

    • :set_array (String) — default: built from first row passed in

      the set clause

    • :where_datatable (String) — default: '"#{table_name}.id = datatable.id"'

      the where clause specifying how to join the datatable against the real table

    • :where_constraint (String)

      the rest of the where clause that limits what rows of the table get updated

  • #update_many(rows = {}, options = {}) ⇒ Array<Hash>

    Parameters:

    • rows (Hash<Hash, Hash>) (defaults to: {})

      ({}) data to be updated

    Options Hash (options):

    • :set_array (String) — default: built from the values in the first key/value pair of `rows`

      the set clause

    • :where_datatable (String) — default: '"#{table_name}.id = datatable.id"'

      the where clause specifying how to join the datatable against the real table

    • :where_constraint (String)

      the rest of the where clause that limits what rows of the table get updated

Parameters:

  • options (Hash) (defaults to: {})

    ({}) options for bulk inserts

Options Hash (options):

  • :slice_size (Integer) — default: 1000

    how many records will be created in a single SQL query

  • :check_consitency (Boolean) — default: true

    ensure some modicum of sanity on the incoming dataset, specifically: does each row define the same set of key/value pairs

  • :returning (Array) — default: nil

    list of fields to return.

  • :returning (String) — default: nil

    single field to return.

Returns:

  • (Array<Hash>)

    rows returned from DB as option requests

Raises:

  • (BulkUploadDataInconsistent)

    raised when key/value pairs between rows are inconsistent (check disabled with option :check_consistency)



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/bulk_data_methods/bulk_methods_mixin.rb', line 169

def update_many(rows, options = {})
  return [] if rows.blank?
  if rows.is_a?(Hash)
    options[:where_datatable] = '"' + rows.keys[0].keys.map{|key| '#{table_name}.' + "#{key} = datatable.#{key}"}.join(' and ') + '"'
    options[:set_array] = '"' + rows.values[0].keys.map{|key| "#{key} = datatable.#{key}"}.join(',') + '"' unless options[:set_array]
    r = []
    rows.each do |key,value|
      r << key.merge(value)
    end
    rows = r
  end
  unless options[:set_array]
    column_names =  rows[0].keys
    columns_to_remove = [:id]
    columns_to_remove += partition_keys.flatten.map{|k| k.to_sym} if respond_to?(:partition_keys)
    options[:set_array] = '"' + (column_names - columns_to_remove).map{|cn| "#{cn} = datatable.#{cn}"}.join(',') + '"'
  end
  options[:slice_size] = 1000 unless options[:slice_size]
  options[:check_consistency] = true unless options.has_key?(:check_consistency)
  returning_clause = ""
  if options[:returning]
    if options[:returning].is_a?(Array)
      returning_list = options[:returning].map{|r| '#{table_name}.' + r.to_s}.join(',')
    else
      returning_list = options[:returning]
    end
    returning_clause = "\" returning #{returning_list}\""
  end
  where_clause = options[:where_datatable] || '"#{table_name}.id = datatable.id"'
  where_constraint = ""
  if options[:where_constraint]
    where_constraint = '" AND #{eval(options[:where_constraint])}"'
  end
  returning = []

  rows.group_by do |row|
    respond_to?(:partition_table_name) ? partition_table_name(*partition_key_values(row)) : table_name
  end.each do |table_name, rows_for_table|
    column_names = rows_for_table[0].keys.sort{|a,b| a.to_s <=> b.to_s}
    rows_for_table.each_slice(options[:slice_size]) do |update_slice|
      datatable_rows = []
      update_slice.each_with_index do |row,i|
        if options[:check_consistency]
          row_column_names = row.keys.sort{|a,b| a.to_s <=> b.to_s}
          if column_names != row_column_names
            raise BulkUploadDataInconsistent.new(self, table_name, column_names, row_column_names, "while attempting to build update statement")
          end
        end
        datatable_rows << row.map do |column_name,column_value|
          column_name = column_name.to_s
          columns_hash_value = columns_hash[column_name]
          if i == 0
            "#{quote_value(column_value, columns_hash_value)}::#{columns_hash_value.sql_type} as #{column_name}"
          else
            quote_value(column_value, columns_hash_value)
          end
        end.join(',')
      end
      datatable = datatable_rows.join(' union select ')

      sql_update_string = <<-SQL
        update #{table_name} set
          #{eval(options[:set_array])}
        from
        (select
          #{datatable}
        ) as datatable
        where
          #{eval(where_clause)}
          #{eval(where_constraint)}
        #{eval(returning_clause)}
      SQL
      returning += find_by_sql(sql_update_string)
    end
  end
  return returning
end