Module: Delayed::Common::ClassMethods

Defined in:
lib/delayed/job/common.rb

Instance Method Summary collapse

Instance Method Details

#clear_locks!Object



212
213
214
# File 'lib/delayed/job/common.rb', line 212

def clear_locks!
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

#enqueue(*args, &block) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/delayed/job/common.rb', line 136

def enqueue(*args, &block)
  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  priority = args.first || 0
  run_at   = args[1]

  Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

#find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/delayed/job/common.rb', line 149

def find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME)

  time_now = db_time_now

  sql = Delayed::Job::NextTaskSQL.dup

  conditions = [time_now, time_now - max_run_time, worker_name]

  if self.min_priority
    sql << ' AND (priority >= ?)'
    conditions << min_priority
  end

  if self.max_priority
    sql << ' AND (priority <= ?)'
    conditions << max_priority
  end

  conditions.unshift(sql)
  
  #DM vs. AR
  if self.respond_to?(:find)
    records = ActiveRecord::Base.silence do
      find(:all, :conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit)
    end
  else
    orig, DataMapper.logger.level = DataMapper.logger.level, :error
    records = all(:conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit)
    DataMapper.logger.level = orig
  end

  records.sort_by { rand() }
end

#invoke_job(job, &block) ⇒ Object

Moved into its own method so that new_relic can trace it.



243
244
245
# File 'lib/delayed/job/common.rb', line 243

def invoke_job(job, &block)
  block.call(job)
end

#log_exception(job, error) ⇒ Object

This is a good hook if you need to report job processing errors in additional or different ways



217
218
219
220
# File 'lib/delayed/job/common.rb', line 217

def log_exception(job, error)
  logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
  logger.error(error)
end

#reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block) ⇒ Object

Get the payload of the next job we can get an exclusive lock on. If no jobs are left we return nil



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/delayed/job/common.rb', line 185

def reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block)
  # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
  # this leads to a more even distribution of jobs across the worker processes
  find_available(5, max_run_time).each do |job|
    begin
      logger.info "* [JOB] aquiring lock on #{job.name}"
      job.lock_exclusively!(max_run_time, worker_name)
      runtime =  Benchmark.realtime do
        invoke_job(job.payload_object, &block)
        job.destroy
      end
      logger.info "* [JOB] #{job.name} completed after %.4f" % runtime

      return job
    rescue LockError
      # We did not get the lock, some other worker process must have
      logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
    rescue StandardError => e
      job.reschedule e.message, e.backtrace
      log_exception(job, e)
      return job
    end
  end

  nil
end

#work_off(num = 100) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/delayed/job/common.rb', line 222

def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    job = self.reserve do |j|
      begin
        j.perform
        success += 1
      rescue
        failure += 1
        raise
      end
    end

    break if job.nil?
  end

  return [success, failure]
end