Class: ParallelBatch
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- ParallelBatch
- Defined in:
- lib/parallel_batch.rb,
lib/parallel_batch/version.rb
Constant Summary collapse
- VERSION =
'0.0.2'.freeze
Class Method Summary collapse
-
.find_or_create! ⇒ Object
Class methods ###.
- .reset ⇒ Object
- .start(concurrency = 1) ⇒ Object
- .start_fork ⇒ Object
Instance Method Summary collapse
- #batch_size ⇒ Object
-
#find_records ⇒ Object
Instance methods ###.
- #next_batch ⇒ Object
- #perfom(record) ⇒ Object
- #run ⇒ Object
- #scope ⇒ Object
Class Method Details
.find_or_create! ⇒ Object
Class methods ###
9 10 11 12 13 14 15 |
# File 'lib/parallel_batch.rb', line 9 def self.find_or_create! first || create! # When starting many batches at the same time we are pretty sure to get a MySQL # error reporting a duplicated entry. That's why we are retrying one time only. rescue ActiveRecord::StatementInvalid first || create! end |
.reset ⇒ Object
28 29 30 |
# File 'lib/parallel_batch.rb', line 28 def self.reset find_or_create!.update_attributes!(offset: nil) end |
.start(concurrency = 1) ⇒ Object
17 18 19 |
# File 'lib/parallel_batch.rb', line 17 def self.start(concurrency = 1) concurrency.times { Process.detach(fork { start_fork }) } end |
.start_fork ⇒ Object
21 22 23 24 25 26 |
# File 'lib/parallel_batch.rb', line 21 def self.start_fork puts "#{self} has started with pid #{Process.pid}" ActiveRecord::Base.connection.reconnect! Process.daemon(false) find_or_create!.run end |
Instance Method Details
#batch_size ⇒ Object
63 64 65 |
# File 'lib/parallel_batch.rb', line 63 def batch_size 100 end |
#find_records ⇒ Object
Instance methods ###
36 37 38 |
# File 'lib/parallel_batch.rb', line 36 def find_records offset ? scope.where('id > ?', offset).order(:id).limit(batch_size) : scope.order(:id).limit(batch_size) end |
#next_batch ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/parallel_batch.rb', line 40 def next_batch transaction do reload(lock: true) next unless (records = find_records).last update_attributes!(offset: records.last.id) records end end |
#perfom(record) ⇒ Object
55 56 57 |
# File 'lib/parallel_batch.rb', line 55 def perfom(record) raise NotImplementedError, 'You must override this method to perform your batch.' end |
#run ⇒ Object
49 50 51 52 53 |
# File 'lib/parallel_batch.rb', line 49 def run while records = next_batch records.each { |record| perform(record) rescue nil } end end |
#scope ⇒ Object
59 60 61 |
# File 'lib/parallel_batch.rb', line 59 def scope raise NotImplementedError, 'You must override this method to scope your records.' end |