Class: Burstflow::Workflow
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Burstflow::Workflow
show all
- Includes:
- Callbacks, Configuration
- Defined in:
- lib/burstflow/workflow.rb
Defined Under Namespace
Modules: Callbacks, Configuration
Classes: Builder, InternalError
Constant Summary
collapse
- INITIAL =
'initial'.freeze
- RUNNING =
'running'.freeze
- FINISHED =
'finished'.freeze
- FAILED =
'failed'.freeze
- SUSPENDED =
'suspended'.freeze
- STATUSES =
[INITIAL, RUNNING, FINISHED, FAILED, SUSPENDED].freeze
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#cache ⇒ Object
Returns the value of attribute cache.
25
26
27
|
# File 'lib/burstflow/workflow.rb', line 25
def cache
@cache
end
|
#manager ⇒ Object
Returns the value of attribute manager.
25
26
27
|
# File 'lib/burstflow/workflow.rb', line 25
def manager
@manager
end
|
Class Method Details
.build(*args) ⇒ Object
55
56
57
58
59
60
|
# File 'lib/burstflow/workflow.rb', line 55
def self.build(*args)
new.tap do |wf|
builder = Burstflow::Workflow::Builder.new(wf, *args, &configuration)
wf.flow = {'jobs_config' => builder.as_json}
end
end
|
Instance Method Details
#add_error(job_orexception) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/burstflow/workflow.rb', line 101
def add_error job_orexception
context = {
created_at: Time.now.to_i
}
if job_orexception.is_a?(::Exception)
context[:message] = job_orexception.message
context[:klass] = job_orexception.class.to_s
context[:backtrace] = job_orexception.backtrace.first(10)
context[:cause] = job_orexception.cause
else
context[:job] = job_orexception.id
end
failures.push(context)
end
|
#attributes ⇒ Object
45
46
47
48
49
50
51
52
53
|
# File 'lib/burstflow/workflow.rb', line 45
def attributes
{
id: self.id,
jobs_config: self.jobs_config,
type: self.class.to_s,
status: status,
failures: failures
}
end
|
#complete! ⇒ Object
131
132
133
134
135
136
137
138
139
|
# File 'lib/burstflow/workflow.rb', line 131
def complete!
if has_errors?
failed!
elsif has_suspended_jobs?
suspended!
else
finished!
end
end
|
#failed! ⇒ Object
165
166
167
168
169
170
171
172
173
|
# File 'lib/burstflow/workflow.rb', line 165
def failed!
run_callbacks :failure do
raise InternalError.new(self, "Can't fail: workflow already failed") if failed?
raise InternalError.new(self, "Can't fail: workflow already finished") if finished?
raise InternalError.new(self, "Can't fail: workflow in not runnig") if !(running? || suspended?)
self.status = FAILED
save!
end
end
|
#finished! ⇒ Object
175
176
177
178
179
180
181
182
183
|
# File 'lib/burstflow/workflow.rb', line 175
def finished!
run_callbacks :finish do
raise InternalError.new(self, "Can't finish: workflow already finished") if finished?
raise InternalError.new(self, "Can't finish: workflow already failed") if failed?
raise InternalError.new(self, "Can't finish: workflow in not runnig") if !running?
self.status = FINISHED
save!
end
end
|
#finished_at ⇒ Object
153
154
155
|
# File 'lib/burstflow/workflow.rb', line 153
def finished_at
last_job&.finished_at
end
|
#first_job ⇒ Object
141
142
143
|
# File 'lib/burstflow/workflow.rb', line 141
def first_job
all_jobs.min_by{|n| n.started_at || Time.now.to_i }
end
|
#has_errors? ⇒ Boolean
117
118
119
|
# File 'lib/burstflow/workflow.rb', line 117
def has_errors?
failures.any?
end
|
#has_scheduled_jobs? ⇒ Boolean
121
122
123
124
125
|
# File 'lib/burstflow/workflow.rb', line 121
def has_scheduled_jobs?
cache[:has_scheduled_jobs] ||= jobs.any? do |job|
job.scheduled? || (job.initial? && !job.enqueued?)
end
end
|
#has_suspended_jobs? ⇒ Boolean
127
128
129
|
# File 'lib/burstflow/workflow.rb', line 127
def has_suspended_jobs?
cache[:has_suspended_jobs] ||= jobs.any?(&:suspended?)
end
|
#initial_jobs ⇒ Object
97
98
99
|
# File 'lib/burstflow/workflow.rb', line 97
def initial_jobs
cache[:initial_jobs] ||= jobs.select(&:initial?)
end
|
#job(id) ⇒ Object
89
90
91
|
# File 'lib/burstflow/workflow.rb', line 89
def job(id)
Burstflow::Job.from_hash(self, job_hash(id))
end
|
#job_hash(id) ⇒ Object
85
86
87
|
# File 'lib/burstflow/workflow.rb', line 85
def job_hash(id)
jobs_config[id].deep_dup
end
|
#jobs ⇒ Object
77
78
79
80
81
82
83
|
# File 'lib/burstflow/workflow.rb', line 77
def jobs
Enumerator.new do |y|
jobs_config.keys.each do |id|
y << job(id)
end
end
end
|
#last_job ⇒ Object
145
146
147
|
# File 'lib/burstflow/workflow.rb', line 145
def last_job
all_jobs.max_by{|n| n.finished_at || 0 } if finished?
end
|
#reload ⇒ Object
62
63
64
65
|
# File 'lib/burstflow/workflow.rb', line 62
def reload(*)
self.cache = {}
super
end
|
#resume!(job_id, data) ⇒ Object
72
73
74
75
|
# File 'lib/burstflow/workflow.rb', line 72
def resume!(job_id, data)
manager.resume_workflow!(job_id, data)
self
end
|
#resumed! ⇒ Object
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/burstflow/workflow.rb', line 195
def resumed!
run_callbacks :resume do
raise InternalError.new(self, "Can't resume: workflow already running") if running?
raise InternalError.new(self, "Can't resume: workflow already finished") if finished?
raise InternalError.new(self, "Can't resume: workflow already failed") if failed?
raise InternalError.new(self, "Can't resume: workflow in not suspended") if !suspended?
self.status = RUNNING
save!
end
end
|
#runnig! ⇒ Object
157
158
159
160
161
162
163
|
# File 'lib/burstflow/workflow.rb', line 157
def runnig!
raise InternalError.new(self, "Can't start: workflow already running") if (running? || suspended?)
raise InternalError.new(self, "Can't start: workflow already failed") if failed?
raise InternalError.new(self, "Can't start: workflow already finished") if finished?
self.status = RUNNING
save!
end
|
#set_job(job) ⇒ Object
93
94
95
|
# File 'lib/burstflow/workflow.rb', line 93
def set_job(job)
jobs_config[job.id] = job.as_json
end
|
#start! ⇒ Object
67
68
69
70
|
# File 'lib/burstflow/workflow.rb', line 67
def start!
manager.start_workflow!
self
end
|
#started_at ⇒ Object
149
150
151
|
# File 'lib/burstflow/workflow.rb', line 149
def started_at
first_job&.started_at
end
|
#suspended! ⇒ Object
185
186
187
188
189
190
191
192
193
|
# File 'lib/burstflow/workflow.rb', line 185
def suspended!
run_callbacks :suspend do
raise InternalError.new(self, "Can't suspend: workflow already finished") if finished?
raise InternalError.new(self, "Can't suspend: workflow already failed") if failed?
raise InternalError.new(self, "Can't suspend: workflow in not runnig") if !running?
self.status = SUSPENDED
save!
end
end
|