Class: Delayed::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/delayed/job.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Constant Summary collapse

MAX_ATTEMPTS =
25
MAX_RUN_TIME =
4.hours
NextTaskSQL =
'(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL AND finished_at IS NULL'
NextTaskOrder =
'priority DESC, run_at ASC'
ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



38
39
40
# File 'lib/delayed/job.rb', line 38

def clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.conditions_available(options = {}) ⇒ Object

Conditions used for find_available method. This is in a separated method to be able to override (or method chain it) more easy so you can customize the behaviour of your workers.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/delayed/job.rb', line 72

def conditions_available(options = {})
  max_run_time = options[:max_run_time] || MAX_RUN_TIME
  worker_name  = options[:worker_name]  || Worker::DEFAULT_WORKER_NAME

  sql        = NextTaskSQL.dup
  time_now   = db_time_now
  conditions = [time_now, time_now - max_run_time, worker_name]
  if options[:min_priority]
    sql << ' AND (priority >= ?)'
    conditions << options[:min_priority]
  end

  if options[:max_priority]
    sql << ' AND (priority <= ?)'
    conditions << options[:max_priority]
  end

  if options[:job_types]
    sql << ' AND (job_type IN (?))'
    conditions << options[:job_types]
  end

  if options[:only_for]
    sql << ' AND name LIKE ?'
    conditions << "%#{options[:only_for]}%"
  end

  if options[:unless] && options[:unless].is_a?(Array) && options[:unless].size > 0
    sql << " AND (#{primary_key} NOT IN (?))"
    conditions << options[:unless]
  end

  conditions.unshift(sql)
end

.enqueue(*args, &block) ⇒ Object

Add a job to the queue. Parameters (positional):

- job
- priority (default is 0)
- run_at (timestamp, default is right now)

You could ignore the first parameter but including a block, which will be taken as the job to enqueue.



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/delayed/job.rb', line 56

def enqueue(*args, &block)
  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  priority = args.first || 0
  run_at   = args[1]

  Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

.find_available(options = {}) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others). Return in random order prevent everyone trying to do same head job at once.



109
110
111
112
113
114
115
# File 'lib/delayed/job.rb', line 109

def find_available(options = {})
  find_opt = {:conditions => conditions_available(options), :order => NextTaskOrder}
  find_opt[:limit] = options[:limit] if options[:limit]
  ActiveRecord::Base.silence do
    find :all, find_opt
  end
end

.reserve_and_run_one_job(options = {}) ⇒ Object

Run the next job we can get an exclusive lock on. If no jobs are left we return nil



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/delayed/job.rb', line 119

def reserve_and_run_one_job( options = {} )
  max_run_time = options[:max_run_time] || MAX_RUN_TIME
  worker_name  = options[:worker_name]  || Worker::DEFAULT_WORKER_NAME
  # For the next jobs availables, try to get lock. In case we cannot get exclusive
  # access to a job we try the next.
  # This leads to a more even distribution of jobs across the worker processes.
  find_available( options ).each do |job|
    t = job.run_with_lock( max_run_time, worker_name )
    if t.nil?
      Delayed::Worker.logger.info "#{worker_name} No work done."
    else
      return t
    end
  end
  # we didn't do any work, all 5 were not lockable
  nil
end

.unlock_all!Object

Unlock all jobs in the database, use careful!



43
44
45
46
47
48
# File 'lib/delayed/job.rb', line 43

def unlock_all!
  unfinished.each do |job|
    job.unlock
    job.save!
  end
end

.work_off(options = {}) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/delayed/job.rb', line 139

def work_off( options = {} )
  n = options[:n] || 100
  success, failure = 0, 0

  n.times do
    case reserve_and_run_one_job( options )
      when true
        success += 1
      when false
        failure += 1
      else
        break  # leave if no work could be done
    end
    Delayed::Worker.logger.info "#{options[:worker_name]} job done..."
    break if Worker.exit # leave if we're exiting
  end

  return [success, failure]
end

Instance Method Details

#failed?Boolean Also known as: failed

class << self

Returns:

  • (Boolean)


161
162
163
# File 'lib/delayed/job.rb', line 161

def failed?
  failed_at
end

#invoke_jobObject

Moved into its own method so that new_relic can trace it.



280
281
282
# File 'lib/delayed/job.rb', line 280

def invoke_job
  payload_object.perform
end

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for the worker given as parameter (the name). Returns true if we have the lock, false otherwise.



250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/delayed/job.rb', line 250

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  affected_rows == 1 && reload && true
end

#log_exception(error) ⇒ Object

This is a good hook if you need to report job processing errors in additional or different ways



270
271
272
273
274
275
276
277
# File 'lib/delayed/job.rb', line 270

def log_exception(error)
  Delayed::Worker.logger.error "[JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
  if Delayed::HIDE_BACKTRACE
    Delayed::Worker.logger.error error.to_s.split("\n").first
  else
    Delayed::Worker.logger.error error
  end
end

#nameObject



170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/delayed/job.rb', line 170

def name
  if new_record?
    payload = payload_object
    if payload.respond_to?(:display_name)
      payload.display_name
    else
      payload.class.name
    end
  else
    self['name']
  end
end

#payload_objectObject



166
167
168
# File 'lib/delayed/job.rb', line 166

def payload_object
  @payload_object ||= deserialize(self['handler'])
end

#payload_object=(object) ⇒ Object



183
184
185
186
187
188
189
190
191
# File 'lib/delayed/job.rb', line 183

def payload_object=(object)
  job_type = if object.respond_to? :job_type
    object.job_type
  else
    object.class.to_s
  end
  self['job_type'] = job_type
  self['handler']  = object.to_yaml
end

#reschedule(message, backtrace = [], time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/delayed/job.rb', line 195

def reschedule(message, backtrace = [], time = nil)
  self.last_error = ([message] + backtrace).join("\n")
  max_attempts = payload_object.send(:max_attempts) rescue MAX_ATTEMPTS
  if self.attempts < max_attempts
    time ||= Job.db_time_now + (attempts ** 4) + 5

    self.attempts += 1
    self.run_at    = time unless self.attempts == max_attempts
    self.unlock
    save!
  else
    if self.attempts > 0
      Delayed::Worker.logger.info "[JOB] PERMANENTLY removing #{name} because of #{attempts} consequetive failures."
    else
      Delayed::Worker.logger.info "[JOB] PERMANENTLY removing #{name} because no attempts for this job"
    end
    if destroy_failed_jobs
      destroy
    else
      self.failed_at = Time.now
      save!
    end
  end
end

#run_with_lock(max_run_time = MAX_RUN_TIME, worker_name = Worker::DEFAULT_WORKER_NAME) ⇒ Object

Try to run one job. Returns true/false (work done/work failed) or nil if job can’t be locked.



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/delayed/job.rb', line 222

def run_with_lock(max_run_time = MAX_RUN_TIME, worker_name = Worker::DEFAULT_WORKER_NAME)
  unless lock_exclusively!(max_run_time, worker_name)
    # We did not get the lock, some other worker process must have
    Delayed::Worker.logger.warn "[JOB] failed to aquire exclusive lock for #{name}"
    return nil # no work done
  end

  begin
    runtime =  Benchmark.realtime do
      Timeout.timeout(max_run_time.to_i) {
        Delayed::Worker.logger.debug "-> #{Thread.current.object_id} invoke_job"
        invoke_job
        Delayed::Worker.logger.debug "<- #{Thread.current.object_id} invoke_job"
      }
    end
    destroy_successful_jobs ? destroy :
      update_attribute(:finished_at, Time.now)
    Delayed::Worker.logger.info "[JOB] #{name} completed after %.4f" % runtime
    return true  # did work
  rescue Exception => e
    reschedule e.message, e.backtrace
    log_exception(e)
    return false  # work failed
  end
end

#unlockObject

Unlock this job (note: not saved to DB)



264
265
266
267
# File 'lib/delayed/job.rb', line 264

def unlock
  self.locked_at    = nil
  self.locked_by    = nil
end