Class: Burst::Job
Defined Under Namespace
Classes: Error
Constant Summary collapse
- SUSPEND =
'suspend'.freeze
Class Method Summary collapse
Instance Method Summary collapse
- #assign_default_values(hash_store) ⇒ Object
- #attributes ⇒ Object
- #configure ⇒ Object
- #current_timestamp ⇒ Object
-
#enqueue! ⇒ Object
mark job as enqueued when it is scheduled to queue.
- #enqueued? ⇒ Boolean
-
#fail! ⇒ Object
mark job as failed when it is failed.
- #failed? ⇒ Boolean
-
#finish! ⇒ Object
mark job as finished when it is finish performing.
- #finished? ⇒ Boolean
- #initial? ⇒ Boolean
-
#initialize(workflow, hash_store = {}) ⇒ Job
constructor
A new instance of Job.
- #parents_succeeded? ⇒ Boolean
-
#perform ⇒ Object
execute this code by ActiveJob.
- #ready_to_start? ⇒ Boolean
- #reload ⇒ Object
-
#resume(data) ⇒ Object
execute this code when resumes after suspending.
-
#resume! ⇒ Object
mark job as resumed.
- #resumed? ⇒ Boolean
- #run(klass, opts = {}) ⇒ Object
- #running? ⇒ Boolean
- #save! ⇒ Object
-
#set_output(data) ⇒ Object
store data to be available for next jobs.
-
#start! ⇒ Object
mark job as started when it is start performing.
- #started? ⇒ Boolean
- #succeeded? ⇒ Boolean
-
#suspend ⇒ Object
mark execution as suspended.
-
#suspend! ⇒ Object
mark job as suspended.
- #suspended? ⇒ Boolean
Constructor Details
#initialize(workflow, hash_store = {}) ⇒ Job
Returns a new instance of Job.
12 13 14 15 |
# File 'lib/burst/job.rb', line 12 def initialize(workflow, hash_store = {}) @workflow = workflow assign_default_values(hash_store) end |
Class Method Details
.from_hash(workflow, hash_store) ⇒ Object
39 40 41 |
# File 'lib/burst/job.rb', line 39 def self.from_hash(workflow, hash_store) hash_store[:klass].constantize.new(workflow, hash_store) end |
Instance Method Details
#assign_default_values(hash_store) ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/burst/job.rb', line 17 def assign_default_values(hash_store) set_model(hash_store.deep_dup) self.id ||= SecureRandom.uuid self.workflow_id ||= @workflow.id self.klass ||= self.class.to_s self.incoming ||= [] self.outgoing ||= [] end |
#attributes ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/burst/job.rb', line 77 def attributes { workflow_id: self.workflow_id, id: self.id, klass: self.klass, params: params, incoming: self.incoming, outgoing: self.outgoing, output: output, started_at: started_at, enqueued_at: enqueued_at, finished_at: finished_at, failed_at: failed_at, suspended_at: suspended_at, resumed_at: resumed_at } end |
#configure ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/burst/job.rb', line 61 def configure @workflow.with_lock do yield @workflow.resolve_dependencies @workflow.save! @workflow.all_jobs.to_a.each(&:save!) reload end end |
#current_timestamp ⇒ Object
176 177 178 |
# File 'lib/burst/job.rb', line 176 def Time.now.to_i end |
#enqueue! ⇒ Object
mark job as enqueued when it is scheduled to queue
96 97 98 99 100 101 102 103 104 |
# File 'lib/burst/job.rb', line 96 def enqueue! raise Error.new('Already enqueued') if enqueued? self.enqueued_at = self.started_at = nil self.finished_at = nil self.failed_at = nil self.suspended_at = nil self.resumed_at = nil end |
#enqueued? ⇒ Boolean
136 137 138 |
# File 'lib/burst/job.rb', line 136 def enqueued? !enqueued_at.nil? end |
#fail! ⇒ Object
mark job as failed when it is failed
119 120 121 122 |
# File 'lib/burst/job.rb', line 119 def fail! raise Error.new('Already failed') if failed? self.finished_at = self.failed_at = end |
#failed? ⇒ Boolean
152 153 154 |
# File 'lib/burst/job.rb', line 152 def failed? !failed_at.nil? end |
#finish! ⇒ Object
mark job as finished when it is finish performing
113 114 115 116 |
# File 'lib/burst/job.rb', line 113 def finish! raise Error.new('Already finished') if finished? self.finished_at = end |
#finished? ⇒ Boolean
144 145 146 |
# File 'lib/burst/job.rb', line 144 def finished? !finished_at.nil? end |
#initial? ⇒ Boolean
172 173 174 |
# File 'lib/burst/job.rb', line 172 def initial? incoming.empty? end |
#parents_succeeded? ⇒ Boolean
180 181 182 183 184 |
# File 'lib/burst/job.rb', line 180 def parents_succeeded? incoming.all? do |id| @workflow.get_job(id).succeeded? end end |
#perform ⇒ Object
execute this code by ActiveJob. You may return Burst::Job::SUSPEND to suspend job, or call suspend method
44 |
# File 'lib/burst/job.rb', line 44 def perform; end |
#ready_to_start? ⇒ Boolean
168 169 170 |
# File 'lib/burst/job.rb', line 168 def ready_to_start? !running? && !enqueued? && !finished? && !failed? && parents_succeeded? end |
#reload ⇒ Object
27 28 29 |
# File 'lib/burst/job.rb', line 27 def reload assign_default_values(@workflow.get_job_hash(self.id)) end |
#resume(data) ⇒ Object
execute this code when resumes after suspending
47 48 49 |
# File 'lib/burst/job.rb', line 47 def resume(data) set_output(data) end |
#resume! ⇒ Object
mark job as resumed
130 131 132 133 134 |
# File 'lib/burst/job.rb', line 130 def resume! raise Error.new('Not suspended ') unless suspended? raise Error.new('Already resumed ') if resumed? self.resumed_at = end |
#resumed? ⇒ Boolean
160 161 162 |
# File 'lib/burst/job.rb', line 160 def resumed? !resumed_at.nil? end |
#run(klass, opts = {}) ⇒ Object
71 72 73 74 75 |
# File 'lib/burst/job.rb', line 71 def run(klass, opts = {}) opts[:after] = [*opts[:after], self.id].uniq opts[:before] = [*opts[:before], *self.outgoing].uniq @workflow.run(klass, opts) end |
#running? ⇒ Boolean
148 149 150 |
# File 'lib/burst/job.rb', line 148 def running? started? && !finished? && !suspended? end |
#save! ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/burst/job.rb', line 31 def save! @workflow.with_lock do @workflow.set_job(self) @workflow.save! yield if block_given? end end |
#set_output(data) ⇒ Object
store data to be available for next jobs
52 53 54 |
# File 'lib/burst/job.rb', line 52 def set_output(data) self.output = data end |
#start! ⇒ Object
mark job as started when it is start performing
107 108 109 110 |
# File 'lib/burst/job.rb', line 107 def start! raise Error.new('Already started') if started? self.started_at = end |
#started? ⇒ Boolean
140 141 142 |
# File 'lib/burst/job.rb', line 140 def started? !started_at.nil? end |
#succeeded? ⇒ Boolean
164 165 166 |
# File 'lib/burst/job.rb', line 164 def succeeded? finished? && !failed? end |
#suspend ⇒ Object
mark execution as suspended
57 58 59 |
# File 'lib/burst/job.rb', line 57 def suspend set_output(SUSPEND) end |
#suspend! ⇒ Object
mark job as suspended
125 126 127 |
# File 'lib/burst/job.rb', line 125 def suspend! self.suspended_at = end |
#suspended? ⇒ Boolean
156 157 158 |
# File 'lib/burst/job.rb', line 156 def suspended? !suspended_at.nil? && !resumed? end |