Class: Workhorse::DbJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Workhorse::DbJob
- Defined in:
- lib/workhorse/db_job.rb
Overview
ActiveRecord model representing a job in the database. This class manages the job lifecycle and state transitions within the Workhorse system.
Constant Summary collapse
- STATE_WAITING =
:waiting
- STATE_LOCKED =
:locked
- STATE_STARTED =
:started
- STATE_SUCCEEDED =
:succeeded
- STATE_FAILED =
:failed
- EXP_LOCKED_BY =
/^(.*?)\.(\d+?)\.([^.]+)$/.freeze
Class Method Summary collapse
-
.failed ⇒ ActiveRecord::Relation
Returns jobs in failed state.
-
.locked ⇒ ActiveRecord::Relation
Returns jobs in locked state.
-
.started ⇒ ActiveRecord::Relation
Returns jobs in started state.
-
.succeeded ⇒ ActiveRecord::Relation
Returns jobs in succeeded state.
-
.waiting ⇒ ActiveRecord::Relation
Returns jobs in waiting state.
-
.with_split_locked_by ⇒ ActiveRecord::Relation
Returns a relation with split locked_by field for easier querying.
Instance Method Summary collapse
-
#assert_state!(*states) ⇒ Object
Asserts that the job is in one of the specified states.
-
#mark_failed!(exception) ⇒ Object
Marks the job as failed with the given exception.
-
#mark_locked!(worker_id) ⇒ Object
Marks the job as locked by a specific worker.
-
#mark_started! ⇒ Object
Marks the job as started.
-
#mark_succeeded! ⇒ Object
Marks the job as succeeded.
-
#reset!(force = false) ⇒ Object
Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.
Class Method Details
.failed ⇒ ActiveRecord::Relation
Returns jobs in failed state.
61 62 63 |
# File 'lib/workhorse/db_job.rb', line 61 def self.failed where(state: STATE_FAILED) end |
.locked ⇒ ActiveRecord::Relation
Returns jobs in locked state.
40 41 42 |
# File 'lib/workhorse/db_job.rb', line 40 def self.locked where(state: STATE_LOCKED) end |
.started ⇒ ActiveRecord::Relation
Returns jobs in started state.
47 48 49 |
# File 'lib/workhorse/db_job.rb', line 47 def self.started where(state: STATE_STARTED) end |
.succeeded ⇒ ActiveRecord::Relation
Returns jobs in succeeded state.
54 55 56 |
# File 'lib/workhorse/db_job.rb', line 54 def self.succeeded where(state: STATE_SUCCEEDED) end |
.waiting ⇒ ActiveRecord::Relation
Returns jobs in waiting state.
33 34 35 |
# File 'lib/workhorse/db_job.rb', line 33 def self.waiting where(state: STATE_WAITING) end |
.with_split_locked_by ⇒ ActiveRecord::Relation
Returns a relation with split locked_by field for easier querying. Extracts host, PID, and random string components from locked_by.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/workhorse/db_job.rb', line 70 def self.with_split_locked_by select(<<~SQL) #{table_name}.*, -- random string substring_index(locked_by, '.', -1) as locked_by_rnd, -- pid substring_index( substring_index(locked_by, '.', -2), '.', 1 ) as locked_by_pid, -- get host substring( locked_by, 1, length(locked_by) - length(substring_index(locked_by, '.', -2)) - 1 ) as locked_by_host SQL end |
Instance Method Details
#assert_state!(*states) ⇒ Object
Asserts that the job is in one of the specified states.
192 193 194 195 196 |
# File 'lib/workhorse/db_job.rb', line 192 def assert_state!(*states) unless states.include?(state.to_sym) fail "Job #{id} is not in state #{states.inspect} but in state #{state.inspect}." end end |
#mark_failed!(exception) ⇒ Object
Marks the job as failed with the given exception.
167 168 169 170 171 172 173 174 |
# File 'lib/workhorse/db_job.rb', line 167 def mark_failed!(exception) assert_state! STATE_LOCKED, STATE_STARTED self.failed_at = Time.now self.last_error = %(#{exception.}\n#{exception.backtrace.join("\n")}) self.state = STATE_FAILED save! end |
#mark_locked!(worker_id) ⇒ Object
Marks the job as locked by a specific worker.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/workhorse/db_job.rb', line 133 def mark_locked!(worker_id) if changed? fail "Dirty jobs can't be locked." end if locked_at # TODO: Remove this debug output # puts "Already locked. Job: #{self.id} Worker: #{worker_id}" fail "Job #{id} is already locked by #{locked_by.inspect}." end self.locked_at = Time.now self.locked_by = worker_id self.state = STATE_LOCKED save! end |
#mark_started! ⇒ Object
Marks the job as started.
154 155 156 157 158 159 160 |
# File 'lib/workhorse/db_job.rb', line 154 def mark_started! assert_state! STATE_LOCKED self.started_at = Time.now self.state = STATE_STARTED save! end |
#mark_succeeded! ⇒ Object
Marks the job as succeeded.
180 181 182 183 184 185 186 |
# File 'lib/workhorse/db_job.rb', line 180 def mark_succeeded! assert_state! STATE_STARTED self.succeeded_at = Time.now self.state = STATE_SUCCEEDED save! end |
#reset!(force = false) ⇒ Object
Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.
This is only allowed if the job is in a final state (“succeeded” or “failed”), as only those jobs are safe to modify; workhorse will not touch these jobs. To reset a job without checking the state it is in, set “force” to true. Prior to doing so, ensure that the job is not still being processed by a worker. If possible, shut down all workers before performing a forced reset.
After the job is reset, it will be performed again. If you reset a job that has already been performed (“succeeded”) or partially performed (“failed”), make sure the actions performed in the job are repeatable or have been rolled back. E.g. if the job already wrote something to an external API, it may cause inconsistencies if the job is performed again.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/workhorse/db_job.rb', line 112 def reset!(force = false) unless force assert_state! STATE_SUCCEEDED, STATE_FAILED end self.state = STATE_WAITING self.locked_at = nil self.locked_by = nil self.started_at = nil self.succeeded_at = nil self.failed_at = nil self.last_error = nil save! end |