Class: ActiveRecordTransactioner
- Inherits:
-
Object
- Object
- ActiveRecordTransactioner
- Defined in:
- lib/active-record-transactioner.rb
Constant Summary collapse
- DEFAULT_ARGS =
{ :call_args => [], :call_method => :save!, :transaction_method => :transaction, :transaction_size => 1000 }
- ALLOWED_ARGS =
DEFAULT_ARGS.keys
Instance Method Summary collapse
-
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
-
#initialize(args = {}) ⇒ ActiveRecordTransactioner
constructor
A new instance of ActiveRecordTransactioner.
-
#join ⇒ Object
Waits for any remaining running threads.
-
#queue(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
Constructor Details
#initialize(args = {}) ⇒ ActiveRecordTransactioner
Returns a new instance of ActiveRecordTransactioner.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/active-record-transactioner.rb', line 13 def initialize(args = {}) args.each do |key, val| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) end @args = DEFAULT_ARGS.merge(args) @models = {} @threads = [] @count = 0 @lock = Monitor.new @lock_threads = Monitor.new @lock_models = {} if block_given? begin yield self ensure flush join end end end |
Instance Method Details
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/active-record-transactioner.rb', line 50 def flush threads = [] @lock.synchronize do @models.each do |klass, val| next if val.empty? models = val @models[klass] = [] @count -= models.length thread = nil @lock_models[klass].synchronize do thread = Thread.new do begin @lock_models[klass].synchronize do klass.__send__(@args[:transaction_method]) do models.each do |model| model.__send__(@args[:call_method], *@args[:call_args]) end end end rescue => e puts e.inspect puts e.backtrace if e.is_a?(NoMethodError) and e..to_s.include?("`reverse' for nil:NilClass") puts puts "Warning: Known Rails reverse error when using transaction - retrying in 2 sec." sleep 2 puts "Retrying" puts retry end ensure @threads.delete(Thread.current) end end end @lock_threads.synchronize do threads << thread @threads << thread end end end return { :threads => threads } end |
#join ⇒ Object
Waits for any remaining running threads.
103 104 105 106 107 108 109 |
# File 'lib/active-record-transactioner.rb', line 103 def join @lock_threads.synchronize do @threads.each do |thread| thread.join end end end |
#queue(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/active-record-transactioner.rb', line 37 def queue(model) @lock.synchronize do klass = model.class @lock_models[klass] = Mutex.new if !@lock_models.key?(klass) @models[klass] = [] if !@models.key?(klass) @models[klass] << model @count += 1 flush if @count >= @args[:transaction_size] end end |