Class: BulkInsert::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_insert/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, table_name, column_names, set_size = 500) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/bulk_insert/worker.rb', line 7

def initialize(connection, table_name, column_names, set_size=500)
  @connection = connection
  @set_size = set_size

  columns = connection.columns(table_name)
  column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

  @columns = column_names.map { |name| column_map[name.to_s] }
  @table_name = connection.quote_table_name(table_name)
  @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

  @after_save_callback = nil

  @set = []
end

Instance Attribute Details

#after_save_callbackObject

Returns the value of attribute after_save_callback.



5
6
7
# File 'lib/bulk_insert/worker.rb', line 5

def after_save_callback
  @after_save_callback
end

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/bulk_insert/worker.rb', line 3

def connection
  @connection
end

#set_sizeObject

Returns the value of attribute set_size.



4
5
6
# File 'lib/bulk_insert/worker.rb', line 4

def set_size
  @set_size
end

Instance Method Details

#add(values) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/bulk_insert/worker.rb', line 31

def add(values)
  save! if @set.length >= set_size

  values = values.with_indifferent_access if values.is_a?(Hash)
  mapped = @columns.map.with_index do |column, index|
      value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length)
      if !value_exists
        if column.default.present?
          column.default
        elsif column.name == "created_at" || column.name == "updated_at"
          :__timestamp_placeholder
        else
          nil
        end
      else
        values.is_a?(Hash) ? values[column.name] : values[index]
      end
    end

  @set.push(mapped)
  self
end

#add_all(rows) ⇒ Object



54
55
56
57
# File 'lib/bulk_insert/worker.rb', line 54

def add_all(rows)
  rows.each { |row| add(row) }
  self
end

#after_save(&block) ⇒ Object



59
60
61
# File 'lib/bulk_insert/worker.rb', line 59

def after_save(&block)
  @after_save_callback = block
end

#pending?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/bulk_insert/worker.rb', line 23

def pending?
  @set.any?
end

#pending_countObject



27
28
29
# File 'lib/bulk_insert/worker.rb', line 27

def pending_count
  @set.count
end

#save!Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/bulk_insert/worker.rb', line 63

def save!
  if pending?
    sql = "INSERT INTO #{@table_name} (#{@column_names}) VALUES "
    @now = Time.now

    rows = []
    @set.each do |row|
      values = []
      @columns.zip(row) do |column, value|
        value = @now if value == :__timestamp_placeholder

        if ActiveRecord::VERSION::STRING >= "5.0.0"
          value = @connection.type_cast_from_column(column, value) if column
          values << @connection.quote(value)
        else
          values << @connection.quote(value, column)
        end
      end
      rows << "(#{values.join(',')})"
    end

    sql << rows.join(",")
    @connection.execute(sql)

    @after_save_callback.() if @after_save_callback

    @set.clear
  end

  self
end