Class: CanvasSync::Importers::BulkImporter

Inherits:
Object
  • Object
show all
Defined in:
lib/canvas_sync/importers/bulk_importer.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =

The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable

10_000

Class Method Summary collapse

Class Method Details

.batch_sizeObject



83
84
85
86
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 83

def self.batch_size
  batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i
  batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE
end

.condition_sql(klass, columns) ⇒ Object

This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)

This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:

started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)



77
78
79
80
81
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 77

def self.condition_sql(klass, columns)
  columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ")
  excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ")
  "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})"
end

.import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object

Does a bulk import of a set of models using the activerecord-import gem.

Parameters:

  • report_file_path (String)

    path to the report CSV

  • mapping (Hash)

    a hash of the values to import. See ‘model_mappings.yml` for a format example

  • klass (Object)

    e.g., User

  • conflict_target (Symbol)

    the csv column name that maps to the database column that will determine if we need to update or insert a given row. e.g.,: canvas_user_id

  • import_args (Hash) (defaults to: {})

    Any arguments passed here will be passed through to ActiveRecord::BulkImport. Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update]

Yield Parameters:

  • row (Array)

    if a block is passed in it will yield the current row from the CSV. This can be used if you need to filter or massage the data in any way.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 20

def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength
  csv_column_names = mapping.keys
  database_column_names = mapping.values.map { |value| value[:database_column_name] }
  rows = []
  row_ids = {}

  CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row|
    row = yield(row) if block_given?
    next if row.nil?

    next if row_ids[row[conflict_target]]
    row_ids[row[conflict_target]] = true

    rows << csv_column_names.map do |column|
      if mapping[column][:type].to_sym == :datetime
        # TODO: add some timezone config to the mapping.
        # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one
        DateTime.parse(row[column]).utc rescue nil # rubocop:disable Style/RescueModifier
      else
        row[column]
      end
    end

    if rows.length >= batch_size
      perform_import(klass, database_column_names, rows, mapping[conflict_target][:database_column_name], import_args)
      rows = []
      row_ids = {}
    end
  end

  perform_import(klass, database_column_names, rows, mapping[conflict_target][:database_column_name], import_args)
end

.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 53

def self.perform_import(klass, columns, rows, conflict_target, import_args={})
  return if rows.length.zero?
  columns = columns.dup

  options = { validate: false, on_duplicate_key_update: {
    conflict_target: conflict_target,
    condition: condition_sql(klass, columns),
    columns: columns
  } }.merge(import_args)

  options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore)
  klass.import(columns, rows, options)
end