Class: Workhorse::DbJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/workhorse/db_job.rb

Constant Summary collapse

STATE_WAITING =
:waiting
STATE_LOCKED =
:locked
STATE_STARTED =
:started
STATE_SUCCEEDED =
:succeeded
STATE_FAILED =
:failed

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.failedObject



31
32
33
# File 'lib/workhorse/db_job.rb', line 31

def self.failed
  where(state: STATE_FAILED)
end

.lockedObject



19
20
21
# File 'lib/workhorse/db_job.rb', line 19

def self.locked
  where(state: STATE_LOCKED)
end

.startedObject



23
24
25
# File 'lib/workhorse/db_job.rb', line 23

def self.started
  where(state: STATE_STARTED)
end

.succeededObject



27
28
29
# File 'lib/workhorse/db_job.rb', line 27

def self.succeeded
  where(state: STATE_SUCCEEDED)
end

.waitingObject



15
16
17
# File 'lib/workhorse/db_job.rb', line 15

def self.waiting
  where(state: STATE_WAITING)
end

Instance Method Details

#assert_state!(*states) ⇒ Object



117
118
119
120
121
# File 'lib/workhorse/db_job.rb', line 117

def assert_state!(*states)
  unless states.include?(state.to_sym)
    fail "Job #{id} is not in state #{states.inspect} but in state #{state.inspect}."
  end
end

#mark_failed!(exception) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/workhorse/db_job.rb', line 99

def mark_failed!(exception)
  assert_state! STATE_LOCKED, STATE_STARTED

  self.failed_at  = Time.now
  self.last_error = %(#{exception.message}\n#{exception.backtrace.join("\n")})
  self.state      = STATE_FAILED
  save!
end

#mark_locked!(worker_id) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/workhorse/db_job.rb', line 67

def mark_locked!(worker_id)
  if changed?
    fail "Dirty jobs can't be locked."
  end

  # TODO: Remove this debug output
  # if Workhorse::DbJob.lock.find(id).locked_at
  #   puts "Already locked (with FOR UPDATE)"
  # end

  if locked_at
    # TODO: Remove this debug output
    # puts "Already locked. Job: #{self.id} Worker: #{worker_id}"
    fail "Job #{id} is already locked by #{locked_by.inspect}."
  end

  self.locked_at = Time.now
  self.locked_by = worker_id
  self.state     = STATE_LOCKED
  save!
end

#mark_started!Object



90
91
92
93
94
95
96
# File 'lib/workhorse/db_job.rb', line 90

def mark_started!
  assert_state! STATE_LOCKED

  self.started_at = Time.now
  self.state      = STATE_STARTED
  save!
end

#mark_succeeded!Object



109
110
111
112
113
114
115
# File 'lib/workhorse/db_job.rb', line 109

def mark_succeeded!
  assert_state! STATE_STARTED

  self.succeeded_at = Time.now
  self.state        = STATE_SUCCEEDED
  save!
end

#reset!(force = false) ⇒ Object

Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.

This is only allowed if the job is in a final state (“succeeded” or “failed”), as only those jobs are safe to modify; workhorse will not touch these jobs. To reset a job without checking the state it is in, set “force” to true. Prior to doing so, ensure that the job is not still being processed by a worker. If possible, shut down all workers before performing a forced reset.

After the job is reset, it will be performed again. If you reset a job that has already been performed (“succeeded”) or partially performed (“failed”), make sure the actions performed in the job are repeatable or have been rolled back. E.g. if the job already wrote something to an external API, it may cause inconsistencies if the job is performed again.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/workhorse/db_job.rb', line 50

def reset!(force = false)
  unless force
    assert_state! STATE_SUCCEEDED, STATE_FAILED
  end

  self.state = STATE_WAITING
  self.locked_at = nil
  self.locked_by = nil
  self.started_at = nil
  self.succeeded_at = nil
  self.failed_at = nil
  self.last_error = nil

  save!
end