Class: Qwirk::Batch::ActiveRecord::BatchJob

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
JobStatus
Defined in:
lib/qwirk/batch/active_record/batch_job.rb

Constant Summary

Constants included from JobStatus

JobStatus::ABORTED, JobStatus::CANCELED, JobStatus::FINISHED, JobStatus::INITED, JobStatus::PAUSED, JobStatus::RUNNING, JobStatus::STATUSES

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ BatchJob

Returns a new instance of BatchJob.



41
42
43
44
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 41

def initialize(opts={})
  super
  @outstanding_array = []
end

Class Method Details

.acquire(file, worker_name) ⇒ Object

Acquire this file if it hasn’t already been acquired.



20
21
22
23
24
25
26
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 20

def self.acquire(file, worker_name)
  return nil if find_by_file_and_worker_name(file, worker_name)
  create!(:file => file, :worker_name  => worker_name)
rescue ActiveRecord::ActiveRecordError => e
  Rails.logger.warn("Assuming race condition (duplicate index) for BatchJob file=#{file} worker=#{worker_name}: #{e.message}")
  return nil
end

.resume_paused_job(worker_name) ⇒ Object

Acquire and resume a paused job if available



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 29

def self.resume_paused_job(worker_name)
  transaction do
    job = where(:worker_name => worker_name, status => PAUSED).lock(true).first
    return nil unless job
    job.outstanding_records.each do |record|
      job.start_record(record.file_position)
      record.destroy
    end
    job.update_attribute(:status => RUNNING)
  end
end

Instance Method Details

#abortObject



55
56
57
58
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 55

def abort
  save_outstanding_array
  update_attribute(:status => ABORTED)
end

#cancelObject



60
61
62
63
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 60

def cancel
  save_outstanding_array
  update_attribute(:status => CANCELED)
end

#fail_record(file_position, message) ⇒ Object



78
79
80
81
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 78

def fail_record(file_position, message)
  @outstanding_array.delete(file_position)
  failed_records.create!(:file_position => file_position, :message => message)
end

#failed_hashObject



94
95
96
97
98
99
100
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 94

def failed_hash
  hash = {}
  failed_records.each do |failed_record|
    hash[failed_record.file_position] = failed_record.message
  end
  return hash
end

#finishObject



65
66
67
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 65

def finish
  update_attribute(:status => FINISHED)
end

#finish_record(file_position) ⇒ Object



73
74
75
76
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 73

def finish_record(file_position)
  @outstanding_array.delete(file_position)
  update_attribute(:finished_count => finished_count + 1)
end

#outstanding_arrayObject



90
91
92
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 90

def outstanding_array
  @outstanding_array
end

#pauseObject



50
51
52
53
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 50

def pause
  save_outstanding_array
  update_attribute(:status => STOPPED)
end

#retry_failed_recordObject



83
84
85
86
87
88
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 83

def retry_failed_record
  failed_record = failed_records.first
  return nil unless failed_record
  failed_record.destroy
  return failed_record.file_position
end

#run(total_count) ⇒ Object



46
47
48
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 46

def run(total_count)
  update_attributes(:status => RUNNING, :total_count => total_count)
end

#start_record(file_position) ⇒ Object



69
70
71
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 69

def start_record(file_position)
  @outstanding_array << file_position
end