Class: Steve::QueuedJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Steve::QueuedJob
- Defined in:
- lib/steve/queued_job.rb
Class Method Summary collapse
-
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database.
-
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue.
-
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing.
Instance Method Summary collapse
-
#archive_job? ⇒ Boolean
In a state appropriate to be archived?.
-
#associate_with(object) ⇒ Object
Associate this job with the pased active record object.
-
#associated_object ⇒ Object
Can belong to another active record object?.
-
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified.
-
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
-
#fail!(message) ⇒ Object
Mark this job as failed.
-
#finish! ⇒ Object
Mark this job as finished.
-
#lock ⇒ Object
Get a lock on this job.
-
#output ⇒ Object
Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment.
- #output_file ⇒ Object
-
#params ⇒ Object
Serialize the options.
-
#pending ⇒ Object
Scopes.
-
#start! ⇒ Object
Mark this job as started.
-
#success! ⇒ Object
Mark this job as succeeded successfully.
Class Method Details
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database
207 208 209 |
# File 'lib/steve/queued_job.rb', line 207 def self.cleanup(age = 5.days.ago) self.delete_all(["status = 'completed' and run_at < ?", age]) end |
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/steve/queued_job.rb', line 46 def self.execute_jobs(queue = '*', limit = 5) pending_jobs = ActiveRecord::Base.silence do jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5) jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*' jobs.all end jobs_executed = Array.new for job in pending_jobs.sort_by { rand() } Steve.log "[#{job.id}] Attempt to aquire lock" if job.lock Steve.log "[#{job.id}] Lock acquired" ActiveRecord::Base.remove_connection if @child = fork rand Steve.log "[#{job.id}] Forked to #{@child}" $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name ActiveRecord::Base.establish_connection Process.wait else Steve.log "[#{job.id}] Executing" $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}" ActiveRecord::Base.establish_connection Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc) job.execute exit end jobs_executed << job else Steve.log "[#{job.id}] Lock could not be acquired. Moving on." end end jobs_executed end |
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing. Returns true or false depending whether the job has been queued or not.
34 35 36 37 38 39 40 41 42 |
# File 'lib/steve/queued_job.rb', line 34 def self.queue(klass, params = {}, &block) job = self.new job.job = klass.to_s job.params = params job.queue = klass.instance_variable_get('@queue') job.priority = klass.instance_variable_get('@priority') block.call(job) if block_given? job.save end |
Instance Method Details
#archive_job? ⇒ Boolean
In a state appropriate to be archived?
198 199 200 |
# File 'lib/steve/queued_job.rb', line 198 def archive_job? ['failed', 'completed'].include?(self.status) end |
#associate_with(object) ⇒ Object
Associate this job with the pased active record object
192 193 194 195 |
# File 'lib/steve/queued_job.rb', line 192 def associate_with(object) self.associated_object = object self.save(:validate => false) end |
#associated_object ⇒ Object
Can belong to another active record object?
13 |
# File 'lib/steve/queued_job.rb', line 13 belongs_to :associated_object, :polymorphic => true |
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified
182 183 184 185 186 187 188 189 |
# File 'lib/steve/queued_job.rb', line 182 def delay!(delay_time = 30.seconds) self.status = 'delayed' self.run_at = Time.now.utc + delay_time self.started_at = nil self.worker = nil self.retries += 1 self.save(:validate => false) end |
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/steve/queued_job.rb', line 83 def execute job = self.job.constantize.new(self) if job.respond_to?(:perform) start! STDOUT.reopen(output_file) && STDERR.reopen(output_file) begin job.perform success! Steve.log "[#{self.id}] Succeeded" rescue Steve::Job::Delay => e max_attempts = (Steve.max_job_retries || 5) if self.retries >= max_attempts fail!("#{e.} after #{max_attempts} attempt(s)") Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.}" else self.error = e. delay! Steve.log "[#{self.id}] Delayed ('#{e.}')" end rescue Timeout::Error fail!('Timed out') Steve.log "[#{self.id}] Timed out: #{e.to_s}" rescue => e if e.is_a?(Steve::Job::Error) fail!(e.) else if defined?(Airbrake) Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params) end fail!([e.to_s, e.backtrace].join("\n")) end Steve.log "[#{self.id}] Failed: #{e.to_s}" end else fail! "#{self.id} did not respond to 'perform'" Steve.log "[#{self.id}] Failed: does not respond to 'perform'" return false end ensure STDOUT.flush && STDERR.flush if File.exist?(output_file) self.output = File.read(output_file) FileUtils.rm(output_file) end self.save(:validate => false) end |
#fail!(message) ⇒ Object
Mark this job as failed.
160 161 162 163 164 |
# File 'lib/steve/queued_job.rb', line 160 def fail!() self.error = self.status = 'failed' finish! end |
#finish! ⇒ Object
Mark this job as finished
167 168 169 |
# File 'lib/steve/queued_job.rb', line 167 def finish! self.finished_at = Time.now.utc end |
#lock ⇒ Object
Get a lock on this job. Returns true if the lock was successful otherwise it returns false.
143 144 145 146 147 148 149 150 151 |
# File 'lib/steve/queued_job.rb', line 143 def lock rows = self.class.update_all({:worker => Steve.worker_name}, {:id => self.id, :worker => nil}) if rows == 1 self.worker = Steve.worker_name return true else return false end end |
#output ⇒ Object
Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment
133 134 135 136 137 138 139 |
# File 'lib/steve/queued_job.rb', line 133 def output if read_attribute(:output) read_attribute(:output) elsif File.exist?(output_file) File.read(output_file) end end |
#output_file ⇒ Object
202 203 204 |
# File 'lib/steve/queued_job.rb', line 202 def output_file @output_file ||= File.join('', 'tmp', "steve-job-#{self.id}") end |
#params ⇒ Object
Serialize the options
10 |
# File 'lib/steve/queued_job.rb', line 10 serialize :params |
#pending ⇒ Object
Scopes
16 |
# File 'lib/steve/queued_job.rb', line 16 scope :pending, lambda { where(:status => ['pending', 'delayed']) } |
#start! ⇒ Object
Mark this job as started
172 173 174 175 176 177 178 179 |
# File 'lib/steve/queued_job.rb', line 172 def start! self.error = nil self.finished_at = nil self.started_at = Time.now.utc self.status = 'running' self.job_pid = Process.pid self.save(:validate => false) end |
#success! ⇒ Object
Mark this job as succeeded successfully.
154 155 156 157 |
# File 'lib/steve/queued_job.rb', line 154 def success! self.status = 'completed' finish! end |