Class: Workhorse::DbJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/workhorse/db_job.rb

Constant Summary collapse

STATE_WAITING =
:waiting
STATE_LOCKED =
:locked
STATE_STARTED =
:started
STATE_SUCCEEDED =
:succeeded
STATE_FAILED =
:failed
EXP_LOCKED_BY =
/^(.*?)\.(\d+?)\.([^.]+)$/.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.failedObject



33
34
35
# File 'lib/workhorse/db_job.rb', line 33

def self.failed
  where(state: STATE_FAILED)
end

.lockedObject



21
22
23
# File 'lib/workhorse/db_job.rb', line 21

def self.locked
  where(state: STATE_LOCKED)
end

.startedObject



25
26
27
# File 'lib/workhorse/db_job.rb', line 25

def self.started
  where(state: STATE_STARTED)
end

.succeededObject



29
30
31
# File 'lib/workhorse/db_job.rb', line 29

def self.succeeded
  where(state: STATE_SUCCEEDED)
end

.waitingObject



17
18
19
# File 'lib/workhorse/db_job.rb', line 17

def self.waiting
  where(state: STATE_WAITING)
end

.with_split_locked_byObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/workhorse/db_job.rb', line 38

def self.with_split_locked_by
  select(<<~SQL)
    #{table_name}.*,

    -- random string
    substring_index(locked_by, '.', -1) as locked_by_rnd,

    -- pid
    substring_index(
      substring_index(locked_by, '.', -2),
      '.',
      1
    ) as locked_by_pid,

    -- get host
    substring(
      locked_by,
      1,
      length(locked_by) -
      length(substring_index(locked_by, '.', -2)) - 1
    ) as locked_by_host
  SQL
end

Instance Method Details

#assert_state!(*states) ⇒ Object



139
140
141
142
143
# File 'lib/workhorse/db_job.rb', line 139

def assert_state!(*states)
  unless states.include?(state.to_sym)
    fail "Job #{id} is not in state #{states.inspect} but in state #{state.inspect}."
  end
end

#mark_failed!(exception) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/workhorse/db_job.rb', line 121

def mark_failed!(exception)
  assert_state! STATE_LOCKED, STATE_STARTED

  self.failed_at  = Time.now
  self.last_error = %(#{exception.message}\n#{exception.backtrace.join("\n")})
  self.state      = STATE_FAILED
  save!
end

#mark_locked!(worker_id) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/workhorse/db_job.rb', line 94

def mark_locked!(worker_id)
  if changed?
    fail "Dirty jobs can't be locked."
  end

  if locked_at
    # TODO: Remove this debug output
    # puts "Already locked. Job: #{self.id} Worker: #{worker_id}"
    fail "Job #{id} is already locked by #{locked_by.inspect}."
  end

  self.locked_at = Time.now
  self.locked_by = worker_id
  self.state     = STATE_LOCKED
  save!
end

#mark_started!Object



112
113
114
115
116
117
118
# File 'lib/workhorse/db_job.rb', line 112

def mark_started!
  assert_state! STATE_LOCKED

  self.started_at = Time.now
  self.state      = STATE_STARTED
  save!
end

#mark_succeeded!Object



131
132
133
134
135
136
137
# File 'lib/workhorse/db_job.rb', line 131

def mark_succeeded!
  assert_state! STATE_STARTED

  self.succeeded_at = Time.now
  self.state        = STATE_SUCCEEDED
  save!
end

#reset!(force = false) ⇒ Object

Resets job to state “waiting” and clears all meta fields set by workhorse in course of processing this job.

This is only allowed if the job is in a final state (“succeeded” or “failed”), as only those jobs are safe to modify; workhorse will not touch these jobs. To reset a job without checking the state it is in, set “force” to true. Prior to doing so, ensure that the job is not still being processed by a worker. If possible, shut down all workers before performing a forced reset.

After the job is reset, it will be performed again. If you reset a job that has already been performed (“succeeded”) or partially performed (“failed”), make sure the actions performed in the job are repeatable or have been rolled back. E.g. if the job already wrote something to an external API, it may cause inconsistencies if the job is performed again.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/workhorse/db_job.rb', line 77

def reset!(force = false)
  unless force
    assert_state! STATE_SUCCEEDED, STATE_FAILED
  end

  self.state = STATE_WAITING
  self.locked_at = nil
  self.locked_by = nil
  self.started_at = nil
  self.succeeded_at = nil
  self.failed_at = nil
  self.last_error = nil

  save!
end