Class: BulkInsert::Worker
- Inherits:
-
Object
- Object
- BulkInsert::Worker
- Defined in:
- lib/bulk_insert/worker.rb
Instance Attribute Summary collapse
-
#adapter_name ⇒ Object
Returns the value of attribute adapter_name.
-
#after_save_callback ⇒ Object
Returns the value of attribute after_save_callback.
-
#before_save_callback ⇒ Object
Returns the value of attribute before_save_callback.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#ignore ⇒ Object
readonly
Returns the value of attribute ignore.
-
#result_sets ⇒ Object
readonly
Returns the value of attribute result_sets.
-
#set_size ⇒ Object
Returns the value of attribute set_size.
-
#update_duplicates ⇒ Object
readonly
Returns the value of attribute update_duplicates.
Instance Method Summary collapse
- #add(values) ⇒ Object
- #add_all(rows) ⇒ Object
- #after_save(&block) ⇒ Object
- #before_save(&block) ⇒ Object
- #compose_insert_query ⇒ Object
- #execute_query ⇒ Object
-
#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker
constructor
A new instance of Worker.
- #insert_sql_statement ⇒ Object
- #pending? ⇒ Boolean
- #pending_count ⇒ Object
- #save! ⇒ Object
Constructor Details
#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/bulk_insert/worker.rb', line 12 def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false) @statement_adapter = StatementAdapters.adapter_for(connection) @connection = connection @set_size = set_size @adapter_name = connection.adapter_name # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts @ignore = ignore @update_duplicates = update_duplicates @return_primary_keys = return_primary_keys columns = connection.columns(table_name) column_map = columns.inject({}) { |h, c| h.update(c.name => c) } @primary_key = primary_key @columns = column_names.map { |name| column_map[name.to_s] } @table_name = connection.quote_table_name(table_name) @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",") @before_save_callback = nil @after_save_callback = nil @result_sets = [] @set = [] end |
Instance Attribute Details
#adapter_name ⇒ Object
Returns the value of attribute adapter_name.
9 10 11 |
# File 'lib/bulk_insert/worker.rb', line 9 def adapter_name @adapter_name end |
#after_save_callback ⇒ Object
Returns the value of attribute after_save_callback.
8 9 10 |
# File 'lib/bulk_insert/worker.rb', line 8 def after_save_callback @after_save_callback end |
#before_save_callback ⇒ Object
Returns the value of attribute before_save_callback.
7 8 9 |
# File 'lib/bulk_insert/worker.rb', line 7 def before_save_callback @before_save_callback end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
5 6 7 |
# File 'lib/bulk_insert/worker.rb', line 5 def connection @connection end |
#ignore ⇒ Object (readonly)
Returns the value of attribute ignore.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def ignore @ignore end |
#result_sets ⇒ Object (readonly)
Returns the value of attribute result_sets.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def result_sets @result_sets end |
#set_size ⇒ Object
Returns the value of attribute set_size.
6 7 8 |
# File 'lib/bulk_insert/worker.rb', line 6 def set_size @set_size end |
#update_duplicates ⇒ Object (readonly)
Returns the value of attribute update_duplicates.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def update_duplicates @update_duplicates end |
Instance Method Details
#add(values) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/bulk_insert/worker.rb', line 47 def add(values) save! if @set.length >= set_size values = values.with_indifferent_access if values.is_a?(Hash) mapped = @columns.map.with_index do |column, index| value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length) if !value_exists if column.default.present? column.default elsif column.name == "created_at" || column.name == "updated_at" :__timestamp_placeholder else nil end else values.is_a?(Hash) ? values[column.name] : values[index] end end @set.push(mapped) self end |
#add_all(rows) ⇒ Object
70 71 72 73 |
# File 'lib/bulk_insert/worker.rb', line 70 def add_all(rows) rows.each { |row| add(row) } self end |
#after_save(&block) ⇒ Object
79 80 81 |
# File 'lib/bulk_insert/worker.rb', line 79 def after_save(&block) @after_save_callback = block end |
#before_save(&block) ⇒ Object
75 76 77 |
# File 'lib/bulk_insert/worker.rb', line 75 def before_save(&block) @before_save_callback = block end |
#compose_insert_query ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/bulk_insert/worker.rb', line 101 def compose_insert_query sql = insert_sql_statement @now = Time.now rows = [] @set.each do |row| values = [] @columns.zip(row) do |column, value| value = @now if value == :__timestamp_placeholder if ActiveRecord::VERSION::STRING >= "5.0.0" value = @connection.lookup_cast_type_from_column(column, value) if column values << @connection.quote(value) else values << @connection.quote(value, column) end end rows << "(#{values.join(',')})" end if !rows.empty? sql << rows.join(",") sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates) sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys sql else false end end |
#execute_query ⇒ Object
94 95 96 97 98 99 |
# File 'lib/bulk_insert/worker.rb', line 94 def execute_query if query = compose_insert_query result_set = @connection.exec_query(query) @result_sets.push(result_set) if @return_primary_keys end end |
#insert_sql_statement ⇒ Object
131 132 133 134 |
# File 'lib/bulk_insert/worker.rb', line 131 def insert_sql_statement insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : '' "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES " end |
#pending? ⇒ Boolean
39 40 41 |
# File 'lib/bulk_insert/worker.rb', line 39 def pending? @set.any? end |
#pending_count ⇒ Object
43 44 45 |
# File 'lib/bulk_insert/worker.rb', line 43 def pending_count @set.count end |
#save! ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/bulk_insert/worker.rb', line 83 def save! if pending? @before_save_callback.(@set) if @before_save_callback execute_query @after_save_callback.() if @after_save_callback @set.clear end self end |