Class: Delayed::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
MongoMapper::Document
Defined in:
lib/delayed/job/mongo_job.rb,
lib/delayed/job.rb,
lib/delayed/job/active_record_job.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Constant Summary collapse

MAX_ATTEMPTS =
25
MAX_RUN_TIME =
4.hours
ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/
NextTaskSQL =
'(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder =
'priority DESC, run_at ASC'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_locks!Object

When a worker is exiting, make sure we don’t have any locked jobs.



29
30
31
# File 'lib/delayed/job.rb', line 29

def self.clear_locks!
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.enqueue(*args, &block) ⇒ Object

Add a job to the queue



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/delayed/job/mongo_job.rb', line 72

def self.enqueue(*args, &block)
  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  priority = args.first || 0
  run_at   = args[1]

  Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

.find_available(limit = 5, max_run_time = MAX_RUN_TIME) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others). Return in random order prevent everyone trying to do same head job at once.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/delayed/job/mongo_job.rb', line 87

def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
  time_now = db_time_now

  conditions = {}

  if self.min_priority
    conditions["priority"] ||= {}
    conditions["priority"].merge!("$gte" => min_priority)
  end

  if self.max_priority
    conditions["priority"] ||= {}
    conditions["priority"].merge!("$lte" => max_priority)
  end

  overtime = make_date(time_now - max_run_time.to_i)
  query = "(this.run_at <= #{make_date(time_now)} && (this.locked_at == null || this.locked_at < #{overtime}) || this.locked_by == '#{worker_name}') && this.failed_at == null"

  conditions.merge!("$where" => make_query(query))

  records = collection.find(conditions, {:sort => [['priority', 'descending'], ['run_at', 'ascending']], :limit => limit}).map {|x| new(x)} #, :order => NextTaskOrder, :limit => limit)
  records.sort_by { rand() }
end

.last(opts = {}) ⇒ Object



20
21
22
# File 'lib/delayed/job/mongo_job.rb', line 20

def self.last(opts={})
  super(opts.merge(:order => 'id'))
end

.make_date(date) ⇒ Object



111
112
113
# File 'lib/delayed/job/mongo_job.rb', line 111

def self.make_date(date)
  "new Date(#{date.to_f * 1000})"
end

.make_query(string) ⇒ Object



119
120
121
# File 'lib/delayed/job/mongo_job.rb', line 119

def self.make_query(string)
  Mongo::Code.new("function() { return (#{string}); }")
end

.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME) ⇒ Object

Run the next job we can get an exclusive lock on. If no jobs are left we return nil



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/delayed/job.rb', line 59

def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)

  # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
  # this leads to a more even distribution of jobs across the worker processes
  find_available(5, max_run_time).each do |job|
    t = job.run_with_lock(max_run_time, worker_name)
    return t unless t == nil  # return if we did work (good or bad)
  end

  nil # we didn't do any work, all 5 were not lockable
end

.work_off(num = 100) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/delayed/job.rb', line 85

def self.work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case self.reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end

Instance Method Details

#failed?Boolean Also known as: failed

Returns:

  • (Boolean)


33
34
35
# File 'lib/delayed/job.rb', line 33

def failed?
  failed_at
end

#invoke_jobObject

Moved into its own method so that new_relic can trace it.



104
105
106
# File 'lib/delayed/job.rb', line 104

def invoke_job
  payload_object.perform
end

#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/delayed/job/mongo_job.rb', line 143

def lock_exclusively!(max_run_time, worker = worker_name)
  now = self.class.db_time_now

  affected_rows = if locked_by != worker
    overtime = make_date(now - max_run_time.to_i)
    query = "this._id == '#{id}' && (this.locked_at == null || this.locked_at < #{overtime})"

    conditions = {"$where" => make_query(query)}
    matches = collection.find(conditions).count
    collection.update(conditions, {"$set" => {:locked_at => now, :locked_by => worker}}, :multi => true)
    matches
  else
    conditions = {"_id" => Mongo::ObjectID.from_string(id), "locked_by" => worker} 
    matches = collection.find(conditions).count
    collection.update(conditions, {"$set" => {"locked_at" => now}}, :multi => true)
    matches
  end
  if affected_rows == 1
    self.locked_at    = now
    self.locked_by    = worker
    return true
  else
    return false
  end
end

#log_exception(error) ⇒ Object

This is a good hook if you need to report job processing errors in additional or different ways



78
79
80
81
# File 'lib/delayed/job.rb', line 78

def log_exception(error)
  logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
  logger.error(error)
end

#make_date(date) ⇒ Object



115
116
117
# File 'lib/delayed/job/mongo_job.rb', line 115

def make_date(date)
  self.class.make_date(date)
end

#make_query(string) ⇒ Object



123
124
125
# File 'lib/delayed/job/mongo_job.rb', line 123

def make_query(string)
  self.class.make_query(string)
end

#nameObject



42
43
44
45
46
47
48
49
50
51
# File 'lib/delayed/job.rb', line 42

def name
  @name ||= begin
    payload = payload_object
    if payload.respond_to?(:display_name)
      payload.display_name
    else
      payload.class.name
    end
  end
end

#payload_objectObject



38
39
40
# File 'lib/delayed/job.rb', line 38

def payload_object
  @payload_object ||= deserialize(self['handler'])
end

#payload_object=(object) ⇒ Object



53
54
55
# File 'lib/delayed/job.rb', line 53

def payload_object=(object)
  self['handler'] = object.to_yaml
end

#reschedule(message, backtrace = [], time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/delayed/job/mongo_job.rb', line 31

def reschedule(message, backtrace = [], time = nil)
  if self.attempts < MAX_ATTEMPTS
    time ||= Job.db_time_now + (attempts ** 4) + 5

    self.attempts    += 1
    self.run_at       = time
    self.last_error   = message + "\n" + backtrace.join("\n")
    self.unlock
    save!
  else
    logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
    destroy_failed_jobs ? destroy : update_attributes(:failed_at => Delayed::Job.db_time_now)
  end
end

#run_with_lock(max_run_time, worker_name) ⇒ Object

Try to run one job. Returns true/false (work done/work failed) or nil if job can’t be locked.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/delayed/job/mongo_job.rb', line 47

def run_with_lock(max_run_time, worker_name)
  logger.info "* [JOB] aquiring lock on #{name}"
  unless lock_exclusively!(max_run_time, worker_name)
    # We did not get the lock, some other worker process must have
    logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
    return nil # no work done
  end

  begin
    runtime =  Benchmark.realtime do
      invoke_job # TODO: raise error if takes longer than max_run_time
      logger.info "Also destroying self"
      destroy
    end
    # TODO: warn if runtime > max_run_time ?
    logger.info "* [JOB] #{name} completed after %.4f" % runtime
    return true  # did work
  rescue Exception => e
    reschedule e.message, e.backtrace
    log_exception(e)
    return false  # work failed
  end
end

#unlockObject

Unlock this job (note: not saved to DB)



72
73
74
75
# File 'lib/delayed/job.rb', line 72

def unlock
  self.locked_at    = nil
  self.locked_by    = nil
end