Module: Delayed::Common::ClassMethods
- Defined in:
- lib/delayed/job/common.rb
Instance Method Summary collapse
- #clear_locks! ⇒ Object
- #enqueue(*args, &block) ⇒ Object
- #find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME) ⇒ Object
-
#invoke_job(job, &block) ⇒ Object
Moved into its own method so that new_relic can trace it.
-
#log_exception(job, error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways.
-
#reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block) ⇒ Object
Get the payload of the next job we can get an exclusive lock on.
- #work_off(num = 100) ⇒ Object
Instance Method Details
#clear_locks! ⇒ Object
212 213 214 |
# File 'lib/delayed/job/common.rb', line 212 def clear_locks! update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
#enqueue(*args, &block) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/delayed/job/common.rb', line 136 def enqueue(*args, &block) object = block_given? ? EvaledJob.new(&block) : args.shift unless object.respond_to?(:perform) || block_given? raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end priority = args.first || 0 run_at = args[1] Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) end |
#find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/delayed/job/common.rb', line 149 def find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME) time_now = db_time_now sql = Delayed::Job::NextTaskSQL.dup conditions = [time_now, time_now - max_run_time, worker_name] if self.min_priority sql << ' AND (priority >= ?)' conditions << min_priority end if self.max_priority sql << ' AND (priority <= ?)' conditions << max_priority end conditions.unshift(sql) #DM vs. AR if self.respond_to?(:find) records = ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit) end else orig, DataMapper.logger.level = DataMapper.logger.level, :error records = all(:conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit) DataMapper.logger.level = orig end records.sort_by { rand() } end |
#invoke_job(job, &block) ⇒ Object
Moved into its own method so that new_relic can trace it.
243 244 245 |
# File 'lib/delayed/job/common.rb', line 243 def invoke_job(job, &block) block.call(job) end |
#log_exception(job, error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways
217 218 219 220 |
# File 'lib/delayed/job/common.rb', line 217 def log_exception(job, error) logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.} - #{job.attempts} failed attempts" logger.error(error) end |
#reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block) ⇒ Object
Get the payload of the next job we can get an exclusive lock on. If no jobs are left we return nil
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/delayed/job/common.rb', line 185 def reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block) # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes find_available(5, max_run_time).each do |job| begin logger.info "* [JOB] aquiring lock on #{job.name}" job.lock_exclusively!(max_run_time, worker_name) runtime = Benchmark.realtime do invoke_job(job.payload_object, &block) job.destroy end logger.info "* [JOB] #{job.name} completed after %.4f" % runtime return job rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" rescue StandardError => e job.reschedule e., e.backtrace log_exception(job, e) return job end end nil end |
#work_off(num = 100) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/delayed/job/common.rb', line 222 def work_off(num = 100) success, failure = 0, 0 num.times do job = self.reserve do |j| begin j.perform success += 1 rescue failure += 1 raise end end break if job.nil? end return [success, failure] end |