Class: Resque::Durable::QueueAudit
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Resque::Durable::QueueAudit
- Extended by:
- Recovery
- Defined in:
- lib/resque/durable/queue_audit.rb
Defined Under Namespace
Modules: Recovery
Constant Summary collapse
- JobCollision =
Class.new(StandardError)
- DEFAULT_DURATION =
id enqueued_id queue_name payload enqueue_count enqueued_at completed_at timeout_at updated_at created_at
10.minutes
Class Method Summary collapse
Instance Method Summary collapse
- #complete! ⇒ Object
- #complete? ⇒ Boolean
-
#delay ⇒ Object
1, 8, 27, 64, 125, 216, etc.
- #duration ⇒ Object
- #enqueue ⇒ Object
- #enqueued! ⇒ Object
- #fail! ⇒ Object
- #heartbeat! ⇒ Object
- #job_klass ⇒ Object
- #job_klass=(klass) ⇒ Object
-
#optimistic_heartbeat!(last_timeout_at) ⇒ Object
Bumps the
timeout_atcolumn, but raises aJobCollisionexception if another process has changed the value, indicating we may have multiple workers processing the same job. - #payload ⇒ Object
- #payload=(value) ⇒ Object
- #queue ⇒ Object
- #reset_backoff!(timeout_at = Time.now.utc) ⇒ Object
- #retryable? ⇒ Boolean
Methods included from Recovery
Class Method Details
Instance Method Details
#complete! ⇒ Object
116 117 118 119 |
# File 'lib/resque/durable/queue_audit.rb', line 116 def complete! self.completed_at = Time.now.utc save! end |
#complete? ⇒ Boolean
121 122 123 |
# File 'lib/resque/durable/queue_audit.rb', line 121 def complete? completed_at.present? end |
#delay ⇒ Object
1, 8, 27, 64, 125, 216, etc. minutes.
130 131 132 |
# File 'lib/resque/durable/queue_audit.rb', line 130 def delay (enqueue_count ** 3).minutes end |
#duration ⇒ Object
85 86 87 |
# File 'lib/resque/durable/queue_audit.rb', line 85 def duration job_klass.job_timeout end |
#enqueue ⇒ Object
81 82 83 |
# File 'lib/resque/durable/queue_audit.rb', line 81 def enqueue job_klass.enqueue(*(payload.push(self))) end |
#enqueued! ⇒ Object
109 110 111 112 113 114 |
# File 'lib/resque/durable/queue_audit.rb', line 109 def enqueued! self.enqueued_at = Time.now.utc self.timeout_at = enqueued_at + duration self.enqueue_count += 1 save! end |
#fail! ⇒ Object
105 106 107 |
# File 'lib/resque/durable/queue_audit.rb', line 105 def fail! update_attribute(:timeout_at, Time.now.utc) end |
#heartbeat! ⇒ Object
89 90 91 |
# File 'lib/resque/durable/queue_audit.rb', line 89 def heartbeat! update_attribute(:timeout_at, Time.now.utc + duration) end |
#job_klass ⇒ Object
61 62 63 |
# File 'lib/resque/durable/queue_audit.rb', line 61 def job_klass read_attribute(:job_klass).constantize end |
#job_klass=(klass) ⇒ Object
65 66 67 |
# File 'lib/resque/durable/queue_audit.rb', line 65 def job_klass=(klass) write_attribute(:job_klass, klass.to_s) end |
#optimistic_heartbeat!(last_timeout_at) ⇒ Object
Bumps the timeout_at column, but raises a JobCollision exception if another process has changed the value, indicating we may have multiple workers processing the same job.
96 97 98 99 100 101 102 103 |
# File 'lib/resque/durable/queue_audit.rb', line 96 def optimistic_heartbeat!(last_timeout_at) next_timeout_at = Time.now.utc + duration nrows = self.class. where(id: id, timeout_at: last_timeout_at). update_all(timeout_at: next_timeout_at) raise JobCollision.new unless nrows == 1 next_timeout_at end |
#payload ⇒ Object
69 70 71 |
# File 'lib/resque/durable/queue_audit.rb', line 69 def payload ActiveSupport::JSON.decode(super) end |
#payload=(value) ⇒ Object
73 74 75 |
# File 'lib/resque/durable/queue_audit.rb', line 73 def payload=(value) super value.to_json end |
#queue ⇒ Object
77 78 79 |
# File 'lib/resque/durable/queue_audit.rb', line 77 def queue Resque.queue_from_class(job_klass) end |
#reset_backoff!(timeout_at = Time.now.utc) ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/resque/durable/queue_audit.rb', line 134 def reset_backoff!(timeout_at = Time.now.utc) # Set timeout_at = Time.now and enqueue_count = 1 so # the job can be picked up by the Durable Monitor asap. self.timeout_at = timeout_at self.enqueue_count = 1 save! end |
#retryable? ⇒ Boolean
125 126 127 |
# File 'lib/resque/durable/queue_audit.rb', line 125 def retryable? Time.now.utc > (timeout_at + delay) end |