Class: BulkInsert::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_insert/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bulk_insert/worker.rb', line 12

def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
  @statement_adapter = StatementAdapters.adapter_for(connection)

  @connection = connection
  @set_size = set_size

  @adapter_name = connection.adapter_name
  # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts
  @ignore = ignore
  @update_duplicates = update_duplicates
  @return_primary_keys = return_primary_keys

  columns = connection.columns(table_name)
  column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

  @primary_key = primary_key
  @columns = column_names.map { |name| column_map[name.to_s] }
  @table_name = connection.quote_table_name(table_name)
  @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

  @before_save_callback = nil
  @after_save_callback = nil

  @result_sets = []
  @set = []
end

Instance Attribute Details

#adapter_nameObject

Returns the value of attribute adapter_name.



9
10
11
# File 'lib/bulk_insert/worker.rb', line 9

def adapter_name
  @adapter_name
end

#after_save_callbackObject

Returns the value of attribute after_save_callback.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def after_save_callback
  @after_save_callback
end

#before_save_callbackObject

Returns the value of attribute before_save_callback.



7
8
9
# File 'lib/bulk_insert/worker.rb', line 7

def before_save_callback
  @before_save_callback
end

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/bulk_insert/worker.rb', line 5

def connection
  @connection
end

#ignoreObject (readonly)

Returns the value of attribute ignore.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def ignore
  @ignore
end

#result_setsObject (readonly)

Returns the value of attribute result_sets.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def result_sets
  @result_sets
end

#set_sizeObject

Returns the value of attribute set_size.



6
7
8
# File 'lib/bulk_insert/worker.rb', line 6

def set_size
  @set_size
end

#update_duplicatesObject (readonly)

Returns the value of attribute update_duplicates.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def update_duplicates
  @update_duplicates
end

Instance Method Details

#add(values) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/bulk_insert/worker.rb', line 47

def add(values)
  save! if @set.length >= set_size

  values = values.with_indifferent_access if values.is_a?(Hash)
  mapped = @columns.map.with_index do |column, index|
      value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length)
      if !value_exists
        if column.default.present?
          column.default
        elsif column.name == "created_at" || column.name == "updated_at"
          :__timestamp_placeholder
        else
          nil
        end
      else
        values.is_a?(Hash) ? values[column.name] : values[index]
      end
    end

  @set.push(mapped)
  self
end

#add_all(rows) ⇒ Object



70
71
72
73
# File 'lib/bulk_insert/worker.rb', line 70

def add_all(rows)
  rows.each { |row| add(row) }
  self
end

#after_save(&block) ⇒ Object



79
80
81
# File 'lib/bulk_insert/worker.rb', line 79

def after_save(&block)
  @after_save_callback = block
end

#before_save(&block) ⇒ Object



75
76
77
# File 'lib/bulk_insert/worker.rb', line 75

def before_save(&block)
  @before_save_callback = block
end

#compose_insert_queryObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/bulk_insert/worker.rb', line 101

def compose_insert_query
  sql = insert_sql_statement
  @now = Time.now
  rows = []

  @set.each do |row|
    values = []
    @columns.zip(row) do |column, value|
      value = @now if value == :__timestamp_placeholder

      if ActiveRecord::VERSION::STRING >= "5.0.0"
        value = @connection.lookup_cast_type_from_column(column, value) if column
        values << @connection.quote(value)
      else
        values << @connection.quote(value, column)
      end
    end
    rows << "(#{values.join(',')})"
  end

  if !rows.empty?
    sql << rows.join(",")
    sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates)
    sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys
    sql
  else
    false
  end
end

#execute_queryObject



94
95
96
97
98
99
# File 'lib/bulk_insert/worker.rb', line 94

def execute_query
  if query = compose_insert_query
    result_set = @connection.exec_query(query)
    @result_sets.push(result_set) if @return_primary_keys
  end
end

#insert_sql_statementObject



131
132
133
134
# File 'lib/bulk_insert/worker.rb', line 131

def insert_sql_statement
  insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : ''
  "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES "
end

#pending?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/bulk_insert/worker.rb', line 39

def pending?
  @set.any?
end

#pending_countObject



43
44
45
# File 'lib/bulk_insert/worker.rb', line 43

def pending_count
  @set.count
end

#save!Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/bulk_insert/worker.rb', line 83

def save!
  if pending?
    @before_save_callback.(@set) if @before_save_callback
    execute_query
    @after_save_callback.() if @after_save_callback
    @set.clear
  end

  self
end