Class: RocketJob::Job

Inherits:
Object
  • Object
show all
Includes:
AASM, MongoMapper::Document, Concerns::Worker, SemanticLogger::Loggable
Defined in:
lib/rocket_job/job.rb

Overview

The base job from which all jobs are created

Direct Known Subclasses

RocketJob::Jobs::DirmonJob

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Concerns::Worker

included, #rocket_job_csv_parser, #work

Class Method Details

.create_indexesObject

Create indexes



190
191
192
193
194
195
196
197
# File 'lib/rocket_job/job.rb', line 190

def self.create_indexes
  # Used by find_and_modify in .next_job
  ensure_index({ state:1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true)
  # Remove outdated index if present
  drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil
  # Used by Mission Control
  ensure_index [[:created_at, 1]]
end

.pause_allObject

Pause all running jobs



209
210
211
# File 'lib/rocket_job/job.rb', line 209

def self.pause_all
  where(state: 'running').each { |job| job.pause! }
end

.requeue_dead_worker(worker_name) ⇒ Object

Requeue all jobs for the specified dead worker



200
201
202
203
204
205
206
# File 'lib/rocket_job/job.rb', line 200

def self.requeue_dead_worker(worker_name)
  collection.update(
    { 'worker_name' => worker_name, 'state' => :running },
    { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } },
    multi: true
  )
end

.resume_allObject

Resume all paused jobs



214
215
216
# File 'lib/rocket_job/job.rb', line 214

def self.resume_all
  where(state: 'paused').each { |job| job.resume! }
end

Instance Method Details

#as_jsonObject

Returns [Hash] status of this job



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/rocket_job/job.rb', line 248

def as_json
  attrs = serializable_hash(methods: [:seconds, :duration])
  attrs.delete('result') unless collect_output?
  case
  when running?
    attrs.delete('completed_at')
    attrs.delete('result')
    attrs
  when paused?
    attrs.delete('completed_at')
    attrs.delete('result')
    # Ensure 'paused_at' appears first in the hash
    { 'paused_at' => completed_at }.merge(attrs)
  when aborted?
    attrs.delete('completed_at')
    attrs.delete('result')
    { 'aborted_at' => completed_at }.merge(attrs)
  when failed?
    attrs.delete('completed_at')
    attrs.delete('result')
    { 'failed_at' => completed_at }.merge(attrs)
  else
    attrs
  end
end

#collect_output?Boolean

Returns [true|false] whether to collect the results from running this batch

Returns:

  • (Boolean)


219
220
221
# File 'lib/rocket_job/job.rb', line 219

def collect_output?
  collect_output == true
end

#durationObject

Returns a human readable duration the job has taken



238
239
240
# File 'lib/rocket_job/job.rb', line 238

def duration
  seconds_as_duration(seconds)
end

#expired?Boolean

A job has expired if the expiry time has passed before it is started

Returns:

  • (Boolean)


243
244
245
# File 'lib/rocket_job/job.rb', line 243

def expired?
  started_at.nil? && expires_at && (expires_at < Time.now)
end

#load_from_database(*args) ⇒ Object

After this model is read, convert any hashes in the arguments list to HashWithIndifferentAccess



308
309
310
311
# File 'lib/rocket_job/job.rb', line 308

def load_from_database(*args)
  super
  self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i  } if arguments.present?
end

#reloadObject

Patch the way MongoMapper reloads a model Only reload MongoMapper attributes, leaving other instance variables untouched



298
299
300
301
302
303
304
305
# File 'lib/rocket_job/job.rb', line 298

def reload
  if doc = collection.find_one(:_id => id)
    load_from_database(doc)
    self
  else
    raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection"
  end
end

#secondsObject

Returns [Float] the number of seconds the job has taken

  • Elapsed seconds to process the job from when a worker first started working on it until now if still running, or until it was completed

  • Seconds in the queue if queued



227
228
229
230
231
232
233
234
235
# File 'lib/rocket_job/job.rb', line 227

def seconds
  if completed_at
    completed_at - (started_at || created_at)
  elsif started_at
    Time.now - started_at
  else
    Time.now - created_at
  end
end

#seconds_to_delay(count) ⇒ Object

TODO Jobs are not currently automatically retried. Is there a need?



290
291
292
293
294
# File 'lib/rocket_job/job.rb', line 290

def seconds_to_delay(count)
  # TODO Consider lowering the priority automatically after every retry?
  # Same basic formula for calculating retry interval as delayed_job and Sidekiq
  (count ** 4) + 15 + (rand(30)*(count+1))
end

#status(time_zone = 'Eastern Time (US & Canada)') ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/rocket_job/job.rb', line 274

def status(time_zone='Eastern Time (US & Canada)')
  h = as_json
  h.delete('seconds')
  h.delete('perform_method') if h['perform_method'] == :perform
  h.dup.each_pair do |k,v|
    case
    when v.kind_of?(Time)
      h[k] = v.in_time_zone(time_zone).to_s
    when v.kind_of?(BSON::ObjectId)
      h[k] = v.to_s
    end
  end
  h
end