Class: RocketJob::Job
- Inherits:
-
Object
- Object
- RocketJob::Job
- Includes:
- AASM, MongoMapper::Document, Concerns::Worker, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/job.rb
Overview
The base job from which all jobs are created
Direct Known Subclasses
Class Method Summary collapse
-
.create_indexes ⇒ Object
Create indexes.
-
.pause_all ⇒ Object
Pause all running jobs.
-
.requeue_dead_worker(worker_name) ⇒ Object
Requeue all jobs for the specified dead worker.
-
.resume_all ⇒ Object
Resume all paused jobs.
Instance Method Summary collapse
-
#as_json ⇒ Object
Returns [Hash] status of this job.
-
#collect_output? ⇒ Boolean
Returns [true|false] whether to collect the results from running this batch.
-
#duration ⇒ Object
Returns a human readable duration the job has taken.
-
#expired? ⇒ Boolean
A job has expired if the expiry time has passed before it is started.
-
#load_from_database(*args) ⇒ Object
After this model is read, convert any hashes in the arguments list to HashWithIndifferentAccess.
-
#reload ⇒ Object
Patch the way MongoMapper reloads a model Only reload MongoMapper attributes, leaving other instance variables untouched.
-
#seconds ⇒ Object
Returns [Float] the number of seconds the job has taken - Elapsed seconds to process the job from when a worker first started working on it until now if still running, or until it was completed - Seconds in the queue if queued.
-
#seconds_to_delay(count) ⇒ Object
TODO Jobs are not currently automatically retried.
- #status(time_zone = 'Eastern Time (US & Canada)') ⇒ Object
Methods included from Concerns::Worker
included, #rocket_job_csv_parser, #work
Class Method Details
.create_indexes ⇒ Object
Create indexes
190 191 192 193 194 195 196 197 |
# File 'lib/rocket_job/job.rb', line 190 def self.create_indexes # Used by find_and_modify in .next_job ensure_index({ state:1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true) # Remove outdated index if present drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil # Used by Mission Control ensure_index [[:created_at, 1]] end |
.pause_all ⇒ Object
Pause all running jobs
209 210 211 |
# File 'lib/rocket_job/job.rb', line 209 def self.pause_all where(state: 'running').each { |job| job.pause! } end |
.requeue_dead_worker(worker_name) ⇒ Object
Requeue all jobs for the specified dead worker
200 201 202 203 204 205 206 |
# File 'lib/rocket_job/job.rb', line 200 def self.requeue_dead_worker(worker_name) collection.update( { 'worker_name' => worker_name, 'state' => :running }, { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } }, multi: true ) end |
.resume_all ⇒ Object
Resume all paused jobs
214 215 216 |
# File 'lib/rocket_job/job.rb', line 214 def self.resume_all where(state: 'paused').each { |job| job.resume! } end |
Instance Method Details
#as_json ⇒ Object
Returns [Hash] status of this job
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/rocket_job/job.rb', line 248 def as_json attrs = serializable_hash(methods: [:seconds, :duration]) attrs.delete('result') unless collect_output? case when running? attrs.delete('completed_at') attrs.delete('result') attrs when paused? attrs.delete('completed_at') attrs.delete('result') # Ensure 'paused_at' appears first in the hash { 'paused_at' => completed_at }.merge(attrs) when aborted? attrs.delete('completed_at') attrs.delete('result') { 'aborted_at' => completed_at }.merge(attrs) when failed? attrs.delete('completed_at') attrs.delete('result') { 'failed_at' => completed_at }.merge(attrs) else attrs end end |
#collect_output? ⇒ Boolean
Returns [true|false] whether to collect the results from running this batch
219 220 221 |
# File 'lib/rocket_job/job.rb', line 219 def collect_output? collect_output == true end |
#duration ⇒ Object
Returns a human readable duration the job has taken
238 239 240 |
# File 'lib/rocket_job/job.rb', line 238 def duration seconds_as_duration(seconds) end |
#expired? ⇒ Boolean
A job has expired if the expiry time has passed before it is started
243 244 245 |
# File 'lib/rocket_job/job.rb', line 243 def expired? started_at.nil? && expires_at && (expires_at < Time.now) end |
#load_from_database(*args) ⇒ Object
After this model is read, convert any hashes in the arguments list to HashWithIndifferentAccess
308 309 310 311 |
# File 'lib/rocket_job/job.rb', line 308 def load_from_database(*args) super self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } if arguments.present? end |
#reload ⇒ Object
Patch the way MongoMapper reloads a model Only reload MongoMapper attributes, leaving other instance variables untouched
298 299 300 301 302 303 304 305 |
# File 'lib/rocket_job/job.rb', line 298 def reload if doc = collection.find_one(:_id => id) load_from_database(doc) self else raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection" end end |
#seconds ⇒ Object
Returns [Float] the number of seconds the job has taken
-
Elapsed seconds to process the job from when a worker first started working on it until now if still running, or until it was completed
-
Seconds in the queue if queued
227 228 229 230 231 232 233 234 235 |
# File 'lib/rocket_job/job.rb', line 227 def seconds if completed_at completed_at - (started_at || created_at) elsif started_at Time.now - started_at else Time.now - created_at end end |
#seconds_to_delay(count) ⇒ Object
TODO Jobs are not currently automatically retried. Is there a need?
290 291 292 293 294 |
# File 'lib/rocket_job/job.rb', line 290 def seconds_to_delay(count) # TODO Consider lowering the priority automatically after every retry? # Same basic formula for calculating retry interval as delayed_job and Sidekiq (count ** 4) + 15 + (rand(30)*(count+1)) end |
#status(time_zone = 'Eastern Time (US & Canada)') ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/rocket_job/job.rb', line 274 def status(time_zone='Eastern Time (US & Canada)') h = as_json h.delete('seconds') h.delete('perform_method') if h['perform_method'] == :perform h.dup.each_pair do |k,v| case when v.kind_of?(Time) h[k] = v.in_time_zone(time_zone).to_s when v.kind_of?(BSON::ObjectId) h[k] = v.to_s end end h end |