Class: ParallelBatch

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/parallel_batch.rb,
lib/parallel_batch/version.rb

Constant Summary collapse

VERSION =
'0.0.2'.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.find_or_create!Object

Class methods ###



9
10
11
12
13
14
15
# File 'lib/parallel_batch.rb', line 9

def self.find_or_create!
  first || create!
# When starting many batches at the same time we are pretty sure to get a MySQL
# error reporting a duplicated entry. That's why we are retrying one time only.
rescue ActiveRecord::StatementInvalid
  first || create!
end

.resetObject



28
29
30
# File 'lib/parallel_batch.rb', line 28

def self.reset
  find_or_create!.update_attributes!(offset: nil)
end

.start(concurrency = 1) ⇒ Object



17
18
19
# File 'lib/parallel_batch.rb', line 17

def self.start(concurrency = 1)
  concurrency.times { Process.detach(fork { start_fork }) }
end

.start_forkObject



21
22
23
24
25
26
# File 'lib/parallel_batch.rb', line 21

def self.start_fork
  puts "#{self} has started with pid #{Process.pid}"
  ActiveRecord::Base.connection.reconnect!
  Process.daemon(false)
  find_or_create!.run
end

Instance Method Details

#batch_sizeObject



63
64
65
# File 'lib/parallel_batch.rb', line 63

def batch_size
  100
end

#find_recordsObject

Instance methods ###



36
37
38
# File 'lib/parallel_batch.rb', line 36

def find_records
  offset ? scope.where('id > ?', offset).order(:id).limit(batch_size) : scope.order(:id).limit(batch_size)
end

#next_batchObject



40
41
42
43
44
45
46
47
# File 'lib/parallel_batch.rb', line 40

def next_batch
  transaction do
    reload(lock: true)
    next unless (records = find_records).last
    update_attributes!(offset: records.last.id)
    records
  end
end

#perfom(record) ⇒ Object

Raises:

  • (NotImplementedError)


55
56
57
# File 'lib/parallel_batch.rb', line 55

def perfom(record)
  raise NotImplementedError, 'You must override this method to perform your batch.'
end

#runObject



49
50
51
52
53
# File 'lib/parallel_batch.rb', line 49

def run
  while records = next_batch
    records.each { |record| perform(record) rescue nil }
  end
end

#scopeObject

Raises:

  • (NotImplementedError)


59
60
61
# File 'lib/parallel_batch.rb', line 59

def scope
  raise NotImplementedError, 'You must override this method to scope your records.'
end