Class: Workhorse::DbJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/workhorse/db_job.rb

Overview

ActiveRecord model representing a job in the database. This class manages the job lifecycle and state transitions within the Workhorse system.

Examples:

Creating a job

job = DbJob.create!(
  queue: 'default',
  handler: Marshal.dump(job_instance),
  priority: 0
)

Querying jobs by state

waiting_jobs = DbJob.waiting
failed_jobs = DbJob.failed

Constant Summary collapse

STATE_WAITING =
:waiting
STATE_LOCKED =
:locked
STATE_STARTED =
:started
STATE_SUCCEEDED =
:succeeded
STATE_FAILED =
:failed
EXP_LOCKED_BY =
/^(.*?)\.(\d+?)\.([^.]+)$/.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.failedActiveRecord::Relation

Returns jobs in failed state.

Returns:

  • (ActiveRecord::Relation)

    Jobs that failed during execution



61
62
63
# File 'lib/workhorse/db_job.rb', line 61

def self.failed
  where(state: STATE_FAILED)
end

.lockedActiveRecord::Relation

Returns jobs in locked state.

Returns:

  • (ActiveRecord::Relation)

    Jobs currently locked by workers



40
41
42
# File 'lib/workhorse/db_job.rb', line 40

def self.locked
  where(state: STATE_LOCKED)
end

.startedActiveRecord::Relation

Returns jobs in started state.

Returns:

  • (ActiveRecord::Relation)

    Jobs currently being executed



47
48
49
# File 'lib/workhorse/db_job.rb', line 47

def self.started
  where(state: STATE_STARTED)
end

.succeededActiveRecord::Relation

Returns jobs in succeeded state.

Returns:

  • (ActiveRecord::Relation)

    Jobs that completed successfully



54
55
56
# File 'lib/workhorse/db_job.rb', line 54

def self.succeeded
  where(state: STATE_SUCCEEDED)
end

.waitingActiveRecord::Relation

Returns jobs in waiting state.

Returns:

  • (ActiveRecord::Relation)

    Jobs waiting to be processed



33
34
35
# File 'lib/workhorse/db_job.rb', line 33

def self.waiting
  where(state: STATE_WAITING)
end

.with_split_locked_byActiveRecord::Relation

Returns a relation with split locked_by field for easier querying. Extracts host, PID, and random string components from locked_by.

Returns:

  • (ActiveRecord::Relation)

    Relation with additional computed columns



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/workhorse/db_job.rb', line 70

def self.with_split_locked_by
  select(<<~SQL)
    #{table_name}.*,

    -- random string
    substring_index(locked_by, '.', -1) as locked_by_rnd,

    -- pid
    substring_index(
      substring_index(locked_by, '.', -2),
      '.',
      1
    ) as locked_by_pid,

    -- get host
    substring(
      locked_by,
      1,
      length(locked_by) -
      length(substring_index(locked_by, '.', -2)) - 1
    ) as locked_by_host
  SQL
end

Instance Method Details

#assert_state!(*states) ⇒ Object

Asserts that the job is in one of the specified states.

Parameters:

  • states (Array<Symbol>)

    Valid states for the job

Raises:

  • (RuntimeError)

    If the job is not in any of the specified states



192
193
194
195
196
# File 'lib/workhorse/db_job.rb', line 192

def assert_state!(*states)
  unless states.include?(state.to_sym)
    fail "Job #{id} is not in state #{states.inspect} but in state #{state.inspect}."
  end
end

#mark_failed!(exception) ⇒ Object

Marks the job as failed with the given exception.

Parameters:

  • exception (Exception)

    The exception that caused the failure

Raises:

  • (RuntimeError)

    If the job is not in locked or started state



167
168
169
170
171
172
173
174
# File 'lib/workhorse/db_job.rb', line 167

def mark_failed!(exception)
  assert_state! STATE_LOCKED, STATE_STARTED

  self.failed_at  = Time.now
  self.last_error = %(#{exception.message}\n#{exception.backtrace.join("\n")})
  self.state      = STATE_FAILED
  save!
end

#mark_locked!(worker_id) ⇒ Object

Marks the job as locked by a specific worker.

Parameters:

  • worker_id (String)

    The ID of the worker locking this job

Raises:

  • (RuntimeError)

    If the job is dirty or already locked



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/workhorse/db_job.rb', line 133

def mark_locked!(worker_id)
  if changed?
    fail "Dirty jobs can't be locked."
  end

  if locked_at
    # TODO: Remove this debug output
    # puts "Already locked. Job: #{self.id} Worker: #{worker_id}"
    fail "Job #{id} is already locked by #{locked_by.inspect}."
  end

  self.locked_at = Time.now
  self.locked_by = worker_id
  self.state     = STATE_LOCKED
  save!
end

#mark_started!Object

Marks the job as started.

Raises:

  • (RuntimeError)

    If the job is not in locked state



154
155
156
157
158
159
160
# File 'lib/workhorse/db_job.rb', line 154

def mark_started!
  assert_state! STATE_LOCKED

  self.started_at = Time.now
  self.state      = STATE_STARTED
  save!
end

#mark_succeeded!Object

Marks the job as succeeded.

Raises:

  • (RuntimeError)

    If the job is not in started state



180
181
182
183
184
185
186
# File 'lib/workhorse/db_job.rb', line 180

def mark_succeeded!
  assert_state! STATE_STARTED

  self.succeeded_at = Time.now
  self.state        = STATE_SUCCEEDED
  save!
end

#reset!(force = false) ⇒ Object

Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.

This is only allowed if the job is in a final state (“succeeded” or “failed”), as only those jobs are safe to modify; workhorse will not touch these jobs. To reset a job without checking the state it is in, set “force” to true. Prior to doing so, ensure that the job is not still being processed by a worker. If possible, shut down all workers before performing a forced reset.

After the job is reset, it will be performed again. If you reset a job that has already been performed (“succeeded”) or partially performed (“failed”), make sure the actions performed in the job are repeatable or have been rolled back. E.g. if the job already wrote something to an external API, it may cause inconsistencies if the job is performed again.

Parameters:

  • force (Boolean) (defaults to: false)

    Whether to force reset without state validation

Raises:

  • (RuntimeError)

    If job is not in a final state and force is false



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/workhorse/db_job.rb', line 112

def reset!(force = false)
  unless force
    assert_state! STATE_SUCCEEDED, STATE_FAILED
  end

  self.state = STATE_WAITING
  self.locked_at = nil
  self.locked_by = nil
  self.started_at = nil
  self.succeeded_at = nil
  self.failed_at = nil
  self.last_error = nil

  save!
end