Class: Resque::Durable::QueueAudit
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Resque::Durable::QueueAudit
- Extended by:
- Recovery
- Defined in:
- lib/resque/durable/queue_audit.rb
Defined Under Namespace
Modules: Recovery
Constant Summary collapse
- JobCollision =
Class.new(StandardError)
- DEFAULT_DURATION =
id enqueued_id queue_name payload enqueue_count enqueued_at completed_at timeout_at updated_at created_at
10.minutes
Class Method Summary collapse
Instance Method Summary collapse
- #complete! ⇒ Object
- #complete? ⇒ Boolean
-
#delay ⇒ Object
1, 8, 27, 64, 125, 216, etc.
- #duration ⇒ Object
- #enqueue ⇒ Object
- #enqueued! ⇒ Object
- #fail! ⇒ Object
- #heartbeat! ⇒ Object
- #job_klass ⇒ Object
- #job_klass=(klass) ⇒ Object
-
#optimistic_heartbeat!(last_timeout_at) ⇒ Object
Bumps the ‘timeout_at` column, but raises a `JobCollision` exception if another process has changed the value, indicating we may have multiple workers processing the same job.
- #payload ⇒ Object
- #payload=(value) ⇒ Object
- #queue ⇒ Object
- #reset_backoff!(timeout_at = Time.now.utc) ⇒ Object
- #retryable? ⇒ Boolean
Methods included from Recovery
Class Method Details
Instance Method Details
#complete! ⇒ Object
114 115 116 117 |
# File 'lib/resque/durable/queue_audit.rb', line 114 def complete! self.completed_at = Time.now.utc save! end |
#complete? ⇒ Boolean
119 120 121 |
# File 'lib/resque/durable/queue_audit.rb', line 119 def complete? completed_at.present? end |
#delay ⇒ Object
1, 8, 27, 64, 125, 216, etc. minutes.
128 129 130 |
# File 'lib/resque/durable/queue_audit.rb', line 128 def delay (enqueue_count ** 3).minutes end |
#duration ⇒ Object
83 84 85 |
# File 'lib/resque/durable/queue_audit.rb', line 83 def duration job_klass.job_timeout end |
#enqueue ⇒ Object
79 80 81 |
# File 'lib/resque/durable/queue_audit.rb', line 79 def enqueue job_klass.enqueue(*(payload.push(becomes(job_klass.auditor)))) end |
#enqueued! ⇒ Object
107 108 109 110 111 112 |
# File 'lib/resque/durable/queue_audit.rb', line 107 def enqueued! self.enqueued_at = Time.now.utc self.timeout_at = enqueued_at + duration self.enqueue_count += 1 save! end |
#fail! ⇒ Object
103 104 105 |
# File 'lib/resque/durable/queue_audit.rb', line 103 def fail! update_attribute(:timeout_at, Time.now.utc) end |
#heartbeat! ⇒ Object
87 88 89 |
# File 'lib/resque/durable/queue_audit.rb', line 87 def heartbeat! update_attribute(:timeout_at, Time.now.utc + duration) end |
#job_klass ⇒ Object
59 60 61 |
# File 'lib/resque/durable/queue_audit.rb', line 59 def job_klass read_attribute(:job_klass).constantize end |
#job_klass=(klass) ⇒ Object
63 64 65 |
# File 'lib/resque/durable/queue_audit.rb', line 63 def job_klass=(klass) write_attribute(:job_klass, klass.to_s) end |
#optimistic_heartbeat!(last_timeout_at) ⇒ Object
Bumps the ‘timeout_at` column, but raises a `JobCollision` exception if another process has changed the value, indicating we may have multiple workers processing the same job.
94 95 96 97 98 99 100 101 |
# File 'lib/resque/durable/queue_audit.rb', line 94 def optimistic_heartbeat!(last_timeout_at) next_timeout_at = Time.now.utc + duration nrows = self.class. where(id: id, timeout_at: last_timeout_at). update_all(timeout_at: next_timeout_at) raise JobCollision.new unless nrows == 1 next_timeout_at end |
#payload ⇒ Object
67 68 69 |
# File 'lib/resque/durable/queue_audit.rb', line 67 def payload ActiveSupport::JSON.decode(super) end |
#payload=(value) ⇒ Object
71 72 73 |
# File 'lib/resque/durable/queue_audit.rb', line 71 def payload=(value) super value.to_json end |
#queue ⇒ Object
75 76 77 |
# File 'lib/resque/durable/queue_audit.rb', line 75 def queue Resque.queue_from_class(job_klass) end |
#reset_backoff!(timeout_at = Time.now.utc) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/resque/durable/queue_audit.rb', line 132 def reset_backoff!(timeout_at = Time.now.utc) # Set timeout_at = Time.now and enqueue_count = 1 so # the job can be picked up by the Durable Monitor asap. self.timeout_at = timeout_at self.enqueue_count = 1 save! end |
#retryable? ⇒ Boolean
123 124 125 |
# File 'lib/resque/durable/queue_audit.rb', line 123 def retryable? Time.now.utc > (timeout_at + delay) end |