Class: Workhorse::DbJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Workhorse::DbJob
- Defined in:
- lib/workhorse/db_job.rb
Constant Summary collapse
- STATE_WAITING =
:waiting
- STATE_LOCKED =
:locked
- STATE_STARTED =
:started
- STATE_SUCCEEDED =
:succeeded
- STATE_FAILED =
:failed
- EXP_LOCKED_BY =
/^(.*?)\.(\d+?)\.([^.]+)$/.freeze
Class Method Summary collapse
- .failed ⇒ Object
- .locked ⇒ Object
- .started ⇒ Object
- .succeeded ⇒ Object
- .waiting ⇒ Object
- .with_split_locked_by ⇒ Object
Instance Method Summary collapse
- #assert_state!(*states) ⇒ Object
- #mark_failed!(exception) ⇒ Object
- #mark_locked!(worker_id) ⇒ Object
- #mark_started! ⇒ Object
- #mark_succeeded! ⇒ Object
-
#reset!(force = false) ⇒ Object
Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.
Class Method Details
.failed ⇒ Object
33 34 35 |
# File 'lib/workhorse/db_job.rb', line 33 def self.failed where(state: STATE_FAILED) end |
.locked ⇒ Object
21 22 23 |
# File 'lib/workhorse/db_job.rb', line 21 def self.locked where(state: STATE_LOCKED) end |
.started ⇒ Object
25 26 27 |
# File 'lib/workhorse/db_job.rb', line 25 def self.started where(state: STATE_STARTED) end |
.succeeded ⇒ Object
29 30 31 |
# File 'lib/workhorse/db_job.rb', line 29 def self.succeeded where(state: STATE_SUCCEEDED) end |
.waiting ⇒ Object
17 18 19 |
# File 'lib/workhorse/db_job.rb', line 17 def self.waiting where(state: STATE_WAITING) end |
.with_split_locked_by ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/workhorse/db_job.rb', line 38 def self.with_split_locked_by select(<<~SQL) #{table_name}.*, -- random string substring_index(locked_by, '.', -1) as locked_by_rnd, -- pid substring_index( substring_index(locked_by, '.', -2), '.', 1 ) as locked_by_pid, -- get host substring( locked_by, 1, length(locked_by) - length(substring_index(locked_by, '.', -2)) - 1 ) as locked_by_host SQL end |
Instance Method Details
#assert_state!(*states) ⇒ Object
139 140 141 142 143 |
# File 'lib/workhorse/db_job.rb', line 139 def assert_state!(*states) unless states.include?(state.to_sym) fail "Job #{id} is not in state #{states.inspect} but in state #{state.inspect}." end end |
#mark_failed!(exception) ⇒ Object
121 122 123 124 125 126 127 128 |
# File 'lib/workhorse/db_job.rb', line 121 def mark_failed!(exception) assert_state! STATE_LOCKED, STATE_STARTED self.failed_at = Time.now self.last_error = %(#{exception.}\n#{exception.backtrace.join("\n")}) self.state = STATE_FAILED save! end |
#mark_locked!(worker_id) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/workhorse/db_job.rb', line 94 def mark_locked!(worker_id) if changed? fail "Dirty jobs can't be locked." end if locked_at # TODO: Remove this debug output # puts "Already locked. Job: #{self.id} Worker: #{worker_id}" fail "Job #{id} is already locked by #{locked_by.inspect}." end self.locked_at = Time.now self.locked_by = worker_id self.state = STATE_LOCKED save! end |
#mark_started! ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/workhorse/db_job.rb', line 112 def mark_started! assert_state! STATE_LOCKED self.started_at = Time.now self.state = STATE_STARTED save! end |
#mark_succeeded! ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/workhorse/db_job.rb', line 131 def mark_succeeded! assert_state! STATE_STARTED self.succeeded_at = Time.now self.state = STATE_SUCCEEDED save! end |
#reset!(force = false) ⇒ Object
Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.
This is only allowed if the job is in a final state (“succeeded” or “failed”), as only those jobs are safe to modify; workhorse will not touch these jobs. To reset a job without checking the state it is in, set “force” to true. Prior to doing so, ensure that the job is not still being processed by a worker. If possible, shut down all workers before performing a forced reset.
After the job is reset, it will be performed again. If you reset a job that has already been performed (“succeeded”) or partially performed (“failed”), make sure the actions performed in the job are repeatable or have been rolled back. E.g. if the job already wrote something to an external API, it may cause inconsistencies if the job is performed again.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/workhorse/db_job.rb', line 77 def reset!(force = false) unless force assert_state! STATE_SUCCEEDED, STATE_FAILED end self.state = STATE_WAITING self.locked_at = nil self.locked_by = nil self.started_at = nil self.succeeded_at = nil self.failed_at = nil self.last_error = nil save! end |