Class: BulkInsert::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_insert/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bulk_insert/worker.rb', line 12

def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
  @statement_adapter = StatementAdapters.adapter_for(connection)

  @connection = connection
  @set_size = set_size

  @adapter_name = connection.adapter_name
  # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts
  @ignore = ignore
  @update_duplicates = update_duplicates
  @return_primary_keys = return_primary_keys

  columns = connection.columns(table_name)
  column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

  @primary_key = primary_key
  @columns = column_names.map { |name| column_map[name.to_s] }
  @table_name = connection.quote_table_name(table_name)
  @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

  @before_save_callback = nil
  @after_save_callback = nil

  @result_sets = []
  @set = []
end

Instance Attribute Details

#adapter_nameObject

Returns the value of attribute adapter_name.



9
10
11
# File 'lib/bulk_insert/worker.rb', line 9

def adapter_name
  @adapter_name
end

#after_save_callbackObject

Returns the value of attribute after_save_callback.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def after_save_callback
  @after_save_callback
end

#before_save_callbackObject

Returns the value of attribute before_save_callback.



7
8
9
# File 'lib/bulk_insert/worker.rb', line 7

def before_save_callback
  @before_save_callback
end

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/bulk_insert/worker.rb', line 5

def connection
  @connection
end

#ignoreObject (readonly)

Returns the value of attribute ignore.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def ignore
  @ignore
end

#result_setsObject (readonly)

Returns the value of attribute result_sets.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def result_sets
  @result_sets
end

#set_sizeObject

Returns the value of attribute set_size.



6
7
8
# File 'lib/bulk_insert/worker.rb', line 6

def set_size
  @set_size
end

#update_duplicatesObject (readonly)

Returns the value of attribute update_duplicates.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def update_duplicates
  @update_duplicates
end

Instance Method Details

#add(values) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/bulk_insert/worker.rb', line 47

def add(values)
  save! if @set.length >= set_size

  values = values.with_indifferent_access if values.is_a?(Hash)
  mapped = @columns.map.with_index do |column, index|
      value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length)
      if !value_exists
        if column.default.present?
          column.default
        elsif column.name == "created_at" || column.name == "updated_at"
          :__timestamp_placeholder
        else
          nil
        end
      else
        values.is_a?(Hash) ? values[column.name] : values[index]
      end
    end

  @set.push(mapped)
  self
end

#add_all(rows) ⇒ Object



70
71
72
73
# File 'lib/bulk_insert/worker.rb', line 70

def add_all(rows)
  rows.each { |row| add(row) }
  self
end

#after_save(&block) ⇒ Object



79
80
81
# File 'lib/bulk_insert/worker.rb', line 79

def after_save(&block)
  @after_save_callback = block
end

#before_save(&block) ⇒ Object



75
76
77
# File 'lib/bulk_insert/worker.rb', line 75

def before_save(&block)
  @before_save_callback = block
end

#compose_insert_queryObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/bulk_insert/worker.rb', line 101

def compose_insert_query
  sql = insert_sql_statement
  @now = Time.now
  rows = []

  @set.each do |row|
    values = []
    @columns.zip(row) do |column, value|
      value = @now if value == :__timestamp_placeholder

      if ActiveRecord::VERSION::STRING >= "5.0.0"
        if column
          type = @connection.lookup_cast_type_from_column(column)
          value = type.serialize(value)
        end
        values << @connection.quote(value)
      else
        values << @connection.quote(value, column)
      end
    end
    rows << "(#{values.join(',')})"
  end

  if !rows.empty?
    sql << rows.join(",")
    sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates)
    sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys
    sql
  else
    false
  end
end

#execute_queryObject



94
95
96
97
98
99
# File 'lib/bulk_insert/worker.rb', line 94

def execute_query
  if query = compose_insert_query
    result_set = @connection.exec_query(query)
    @result_sets.push(result_set) if @return_primary_keys
  end
end

#insert_sql_statementObject



134
135
136
137
# File 'lib/bulk_insert/worker.rb', line 134

def insert_sql_statement
  insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : ''
  "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES "
end

#pending?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/bulk_insert/worker.rb', line 39

def pending?
  @set.any?
end

#pending_countObject



43
44
45
# File 'lib/bulk_insert/worker.rb', line 43

def pending_count
  @set.count
end

#save!Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/bulk_insert/worker.rb', line 83

def save!
  if pending?
    @before_save_callback.(@set) if @before_save_callback
    execute_query
    @after_save_callback.() if @after_save_callback
    @set.clear
  end

  self
end