Module: Delayed::Backend::Base
- Included in:
- ActiveRecord::Job, ActiveRecord::Job::Failed, Redis::Job, Redis::Job::Failed
- Defined in:
- lib/delayed/backend/base.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- ON_HOLD_LOCKED_BY =
'on hold'
- ON_HOLD_COUNT =
50
Class Method Summary collapse
Instance Method Summary collapse
- #batch? ⇒ Boolean
- #failed? ⇒ Boolean (also: #failed)
- #full_name ⇒ Object
- #hold! ⇒ Object
- #initialize_defaults ⇒ Object
-
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
- #locked? ⇒ Boolean
- #name ⇒ Object
- #on_hold? ⇒ Boolean
- #payload_object ⇒ Object
- #payload_object=(object) ⇒ Object
- #permanent_failure(error) ⇒ Object
-
#reschedule(error = nil, time = nil) ⇒ Object
Reschedule the job in the future (when a job fails).
- #reschedule_at ⇒ Object
- #unhold! ⇒ Object
-
#unlock ⇒ Object
Unlock this job (note: not saved to DB).
Class Method Details
.included(base) ⇒ Object
13 14 15 16 17 |
# File 'lib/delayed/backend/base.rb', line 13 def self.included(base) base.extend ClassMethods base.default_priority = Delayed::NORMAL_PRIORITY base.before_save :initialize_defaults end |
Instance Method Details
#batch? ⇒ Boolean
231 232 233 |
# File 'lib/delayed/backend/base.rb', line 231 def batch? payload_object.is_a?(Delayed::Batch::PerformableBatch) end |
#failed? ⇒ Boolean Also known as: failed
136 137 138 |
# File 'lib/delayed/backend/base.rb', line 136 def failed? failed_at end |
#full_name ⇒ Object
199 200 201 202 203 204 205 206 |
# File 'lib/delayed/backend/base.rb', line 199 def full_name obj = payload_object rescue nil if obj && obj.respond_to?(:full_name) obj.full_name else name end end |
#hold! ⇒ Object
258 259 260 261 262 263 |
# File 'lib/delayed/backend/base.rb', line 258 def hold! self.locked_by = ON_HOLD_LOCKED_BY self.locked_at = self.class.db_time_now self.attempts = ON_HOLD_COUNT self.save! end |
#initialize_defaults ⇒ Object
311 312 313 314 |
# File 'lib/delayed/backend/base.rb', line 311 def initialize_defaults self.queue ||= Delayed::Settings.queue self.run_at ||= self.class.db_time_now end |
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
221 222 223 224 225 226 227 228 229 |
# File 'lib/delayed/backend/base.rb', line 221 def invoke_job Delayed::Job.in_delayed_job = true begin payload_object.perform ensure Delayed::Job.in_delayed_job = false ::ActiveRecord::Base.clear_active_connections! unless Rails.env.test? end end |
#locked? ⇒ Boolean
241 242 243 |
# File 'lib/delayed/backend/base.rb', line 241 def locked? !!(self.locked_at || self.locked_by) end |
#name ⇒ Object
188 189 190 191 192 193 194 195 196 197 |
# File 'lib/delayed/backend/base.rb', line 188 def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) payload.display_name else payload.class.name end end end |
#on_hold? ⇒ Boolean
274 275 276 |
# File 'lib/delayed/backend/base.rb', line 274 def on_hold? self.locked_by == 'on hold' && self.locked_at && self.attempts == ON_HOLD_COUNT end |
#payload_object ⇒ Object
184 185 186 |
# File 'lib/delayed/backend/base.rb', line 184 def payload_object @payload_object ||= deserialize(self['handler'].untaint) end |
#payload_object=(object) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/delayed/backend/base.rb', line 208 def payload_object=(object) @payload_object = object self['handler'] = object.to_yaml self['tag'] = if object.respond_to?(:tag) object.tag elsif object.is_a?(Module) "#{object}.perform" else "#{object.class}#perform" end end |
#permanent_failure(error) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/delayed/backend/base.rb', line 162 def permanent_failure(error) begin # notify the payload_object of a permanent failure obj = payload_object obj.on_permanent_failure(error) if obj && obj.respond_to?(:on_permanent_failure) rescue DeserializationError # don't allow a failed deserialization to prevent destroying the job end # optionally destroy the object destroy_self = true if Delayed::Worker.on_max_failures destroy_self = Delayed::Worker.on_max_failures.call(self, error) end if destroy_self self.destroy else self.fail! end end |
#reschedule(error = nil, time = nil) ⇒ Object
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/delayed/backend/base.rb', line 143 def reschedule(error = nil, time = nil) begin obj = payload_object obj.on_failure(error) if obj && obj.respond_to?(:on_failure) rescue DeserializationError # don't allow a failed deserialization to prevent rescheduling end self.attempts += 1 if self.attempts >= (self.max_attempts || Delayed::Settings.max_attempts) permanent_failure error || "max attempts reached" else time ||= self.reschedule_at self.run_at = time self.unlock self.save! end end |
#reschedule_at ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/delayed/backend/base.rb', line 245 def reschedule_at new_time = self.class.db_time_now + (attempts ** 4) + 5 begin if payload_object.respond_to?(:reschedule_at) new_time = payload_object.reschedule_at( self.class.db_time_now, attempts) end rescue # TODO: just swallow errors from reschedule_at ? end new_time end |
#unhold! ⇒ Object
265 266 267 268 269 270 271 272 |
# File 'lib/delayed/backend/base.rb', line 265 def unhold! self.locked_by = nil self.locked_at = nil self.attempts = 0 self.run_at = [self.class.db_time_now, self.run_at].max self.failed_at = nil self.save! end |
#unlock ⇒ Object
Unlock this job (note: not saved to DB)
236 237 238 239 |
# File 'lib/delayed/backend/base.rb', line 236 def unlock self.locked_at = nil self.locked_by = nil end |