Class: BulkInsert::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_insert/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/bulk_insert/worker.rb', line 10

def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
  @connection = connection
  @set_size = set_size

  @adapter_name = connection.adapter_name
  # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts
  @ignore = ignore
  @update_duplicates = update_duplicates
  @return_primary_keys = return_primary_keys

  columns = connection.columns(table_name)
  column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

  @primary_key = primary_key
  @columns = column_names.map { |name| column_map[name.to_s] }
  @table_name = connection.quote_table_name(table_name)
  @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

  @before_save_callback = nil
  @after_save_callback = nil

  @result_sets = []
  @set = []
end

Instance Attribute Details

#adapter_nameObject

Returns the value of attribute adapter_name.



7
8
9
# File 'lib/bulk_insert/worker.rb', line 7

def adapter_name
  @adapter_name
end

#after_save_callbackObject

Returns the value of attribute after_save_callback.



6
7
8
# File 'lib/bulk_insert/worker.rb', line 6

def after_save_callback
  @after_save_callback
end

#before_save_callbackObject

Returns the value of attribute before_save_callback.



5
6
7
# File 'lib/bulk_insert/worker.rb', line 5

def before_save_callback
  @before_save_callback
end

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/bulk_insert/worker.rb', line 3

def connection
  @connection
end

#ignoreObject (readonly)

Returns the value of attribute ignore.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def ignore
  @ignore
end

#result_setsObject (readonly)

Returns the value of attribute result_sets.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def result_sets
  @result_sets
end

#set_sizeObject

Returns the value of attribute set_size.



4
5
6
# File 'lib/bulk_insert/worker.rb', line 4

def set_size
  @set_size
end

#update_duplicatesObject (readonly)

Returns the value of attribute update_duplicates.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def update_duplicates
  @update_duplicates
end

Instance Method Details

#add(values) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/bulk_insert/worker.rb', line 43

def add(values)
  save! if @set.length >= set_size

  values = values.with_indifferent_access if values.is_a?(Hash)
  mapped = @columns.map.with_index do |column, index|
      value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length)
      if !value_exists
        if column.default.present?
          column.default
        elsif column.name == "created_at" || column.name == "updated_at"
          :__timestamp_placeholder
        else
          nil
        end
      else
        values.is_a?(Hash) ? values[column.name] : values[index]
      end
    end

  @set.push(mapped)
  self
end

#add_all(rows) ⇒ Object



66
67
68
69
# File 'lib/bulk_insert/worker.rb', line 66

def add_all(rows)
  rows.each { |row| add(row) }
  self
end

#after_save(&block) ⇒ Object



75
76
77
# File 'lib/bulk_insert/worker.rb', line 75

def after_save(&block)
  @after_save_callback = block
end

#before_save(&block) ⇒ Object



71
72
73
# File 'lib/bulk_insert/worker.rb', line 71

def before_save(&block)
  @before_save_callback = block
end

#compose_insert_queryObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/bulk_insert/worker.rb', line 97

def compose_insert_query
  sql = insert_sql_statement
  @now = Time.now
  rows = []

  @set.each do |row|
    values = []
    @columns.zip(row) do |column, value|
      value = @now if value == :__timestamp_placeholder

      if ActiveRecord::VERSION::STRING >= "5.0.0"
        value = @connection.type_cast_from_column(column, value) if column
        values << @connection.quote(value)
      else
        values << @connection.quote(value, column)
      end
    end
    rows << "(#{values.join(',')})"
  end

  if !rows.empty?
    sql << rows.join(",")
    sql << on_conflict_statement
    sql << primary_key_return_statement
    sql
  else
    false
  end
end

#execute_queryObject



90
91
92
93
94
95
# File 'lib/bulk_insert/worker.rb', line 90

def execute_query
  if query = compose_insert_query
    result_set = @connection.exec_query(query)
    @result_sets.push(result_set) if @return_primary_keys
  end
end

#insert_ignoreObject



131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/bulk_insert/worker.rb', line 131

def insert_ignore
  if ignore
    case adapter_name
    when /^mysql/i
      'IGNORE'
    when /\ASQLite/i # SQLite
      'OR IGNORE'
    else
      '' # Not supported
    end
  end
end

#insert_sql_statementObject



127
128
129
# File 'lib/bulk_insert/worker.rb', line 127

def insert_sql_statement
  "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES "
end

#on_conflict_statementObject



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/bulk_insert/worker.rb', line 152

def on_conflict_statement
  if (adapter_name =~ /\APost(?:greSQL|GIS)/i && ignore )
    ' ON CONFLICT DO NOTHING'
  elsif adapter_name =~ /^mysql/i && update_duplicates
    update_values = @columns.map do |column|
      "`#{column.name}`=VALUES(`#{column.name}`)"
    end.join(', ')
    ' ON DUPLICATE KEY UPDATE ' + update_values
  else
    ''
  end
end

#pending?Boolean



35
36
37
# File 'lib/bulk_insert/worker.rb', line 35

def pending?
  @set.any?
end

#pending_countObject



39
40
41
# File 'lib/bulk_insert/worker.rb', line 39

def pending_count
  @set.count
end

#primary_key_return_statementObject



144
145
146
147
148
149
150
# File 'lib/bulk_insert/worker.rb', line 144

def primary_key_return_statement
  if @return_primary_keys && adapter_name =~ /\APost(?:greSQL|GIS)/i
    " RETURNING #{@primary_key}"
  else
    ''
  end
end

#save!Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/bulk_insert/worker.rb', line 79

def save!
  if pending?
    @before_save_callback.(@set) if @before_save_callback
    execute_query
    @after_save_callback.() if @after_save_callback
    @set.clear
  end

  self
end