Class: ActiveRecordTransactioner
- Inherits:
-
Object
- Object
- ActiveRecordTransactioner
- Defined in:
- lib/active-record-transactioner.rb
Constant Summary collapse
- DEFAULT_ARGS =
{ call_args: [], call_method: :save!, transaction_method: :transaction, transaction_size: 1000, threadded: false, max_running_threads: 2, debug: false }
- EMPTY_ARGS =
[]
- ALLOWED_ARGS =
DEFAULT_ARGS.keys
Instance Method Summary collapse
- #bulk_create!(model) ⇒ Object
- #destroy!(model) ⇒ Object
-
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
-
#initialize(args = {}) ⇒ ActiveRecordTransactioner
constructor
A new instance of ActiveRecordTransactioner.
-
#join ⇒ Object
Waits for any remaining running threads.
-
#queue(model, args = {}) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
-
#save!(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
- #threadded? ⇒ Boolean
- #update_column(model, column_name, new_value) ⇒ Object
- #update_columns(model, updates) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ ActiveRecordTransactioner
Returns a new instance of ActiveRecordTransactioner.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/active-record-transactioner.rb', line 18 def initialize(args = {}) args.each_key { |key| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) } @args = DEFAULT_ARGS.merge(args) parse_and_set_args return unless block_given? begin yield self ensure flush join if threadded? end end |
Instance Method Details
#bulk_create!(model) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/active-record-transactioner.rb', line 40 def bulk_create!(model) attributes = model.attributes attributes.delete("id") attributes.delete("created_at") attributes.delete("updated_at") klass = model.class @bulk_creates[klass] ||= [] @bulk_creates[klass] << attributes @count += 1 end |
#destroy!(model) ⇒ Object
61 62 63 |
# File 'lib/active-record-transactioner.rb', line 61 def destroy!(model) queue(model, type: :destroy!) end |
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/active-record-transactioner.rb', line 91 def flush wait_for_threads if threadded? @lock.synchronize do @bulk_creates.each do |klass, attribute_array| if threadded? bulk_insert_attribute_array_threadded(klass, attribute_array) else bulk_insert_attribute_array(klass, attribute_array) end end @models.each do |klass, models| next if models.empty? @models[klass] = [] @count -= models.length if threadded? work_threadded(klass, models) else work_models_through_transaction(klass, models) end end end end |
#join ⇒ Object
Waits for any remaining running threads.
119 120 121 122 123 124 |
# File 'lib/active-record-transactioner.rb', line 119 def join threads_to_join = @lock_threads.synchronize { @threads.clone } debug "Threads to join: #{threads_to_join}" if @debug threads_to_join.each(&:join) end |
#queue(model, args = {}) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/active-record-transactioner.rb', line 66 def queue(model, args = {}) args[:type] ||= :save! @lock.synchronize do klass = model.class validate = args.key?(:validate) ? args[:validate] : true @lock_models[klass] ||= Monitor.new @models[klass] ||= [] @models[klass] << { model: model, type: args.fetch(:type), validate: validate, method_args: args[:method_args] || EMPTY_ARGS } @count += 1 end flush if should_flush? end |
#save!(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
35 36 37 38 |
# File 'lib/active-record-transactioner.rb', line 35 def save!(model) raise ActiveRecord::RecordInvalid, model unless model.valid? queue(model, type: :save!, validate: false) end |
#threadded? ⇒ Boolean
126 127 128 |
# File 'lib/active-record-transactioner.rb', line 126 def threadded? @args[:threadded] end |
#update_column(model, column_name, new_value) ⇒ Object
57 58 59 |
# File 'lib/active-record-transactioner.rb', line 57 def update_column(model, column_name, new_value) update_columns(model, column_name => new_value) end |
#update_columns(model, updates) ⇒ Object
53 54 55 |
# File 'lib/active-record-transactioner.rb', line 53 def update_columns(model, updates) queue(model, type: :update_columns, validate: false, method_args: [updates]) end |