Class: Tasker::WorkflowStep
Defined Under Namespace
Classes: StepFinder
Constant Summary
collapse
- PROVIDES_EDGE_NAME =
'provides'
Class Method Summary
collapse
Instance Method Summary
collapse
configure_database_connections, database_configuration_exists?
Class Method Details
.build_default_step!(task, template, named_step) ⇒ Object
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
|
# File 'app/models/tasker/workflow_step.rb', line 300
def self.build_default_step!(task, template, named_step)
step_attributes = {
task_id: task.task_id,
named_step_id: named_step.named_step_id,
retryable: template.default_retryable,
retry_limit: template.default_retry_limit,
skippable: template.skippable,
in_process: false,
inputs: task.context,
processed: false,
attempts: 0,
results: {}
}
step = new(step_attributes)
step.save!
step
end
|
.by_current_state ⇒ ActiveRecord::Relation
Scopes workflow steps by their current state using state machine transitions
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'app/models/tasker/workflow_step.rb', line 88
scope :by_current_state, lambda { |state = nil|
relation = joins(" INNER JOIN (\n SELECT DISTINCT ON (workflow_step_id) workflow_step_id, to_state\n FROM tasker_workflow_step_transitions\n WHERE most_recent = true\n ORDER BY workflow_step_id, sort_key DESC\n ) current_transitions ON current_transitions.workflow_step_id = tasker_workflow_steps.workflow_step_id\n SQL\n\n if state.present?\n relation.where(current_transitions: { to_state: state })\n else\n relation\n end\n}\n".squish)
|
.completed_since ⇒ ActiveRecord::Relation
Scopes workflow steps completed since a specific time
110
111
112
113
114
|
# File 'app/models/tasker/workflow_step.rb', line 110
scope :completed_since, lambda { |since_time|
joins(:workflow_step_transitions)
.where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'complete')
.where('tasker_workflow_step_transitions.created_at > ?', since_time)
}
|
.failed_since ⇒ ActiveRecord::Relation
Scopes workflow steps that failed since a specific time
121
122
123
124
125
|
# File 'app/models/tasker/workflow_step.rb', line 121
scope :failed_since, lambda { |since_time|
joins(:workflow_step_transitions)
.where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'error')
.where('tasker_workflow_step_transitions.created_at > ?', since_time)
}
|
.find_step_by_name(steps, name) ⇒ WorkflowStep?
Finds a WorkflowStep with the given name by traversing the DAG efficiently
195
196
197
|
# File 'app/models/tasker/workflow_step.rb', line 195
def self.find_step_by_name(steps, name)
StepFinder.find_by_name(steps, name)
end
|
.for_tasks_since ⇒ ActiveRecord::Relation
Scopes workflow steps for tasks created since a specific time
132
133
134
|
# File 'app/models/tasker/workflow_step.rb', line 132
scope :for_tasks_since, lambda { |since_time|
joins(:task).where('tasker_tasks.created_at > ?', since_time)
}
|
.get_steps_for_task(task, templates) ⇒ Object
272
273
274
275
276
277
278
279
280
281
282
283
|
# File 'app/models/tasker/workflow_step.rb', line 272
def self.get_steps_for_task(task, templates)
named_steps = NamedStep.create_named_steps_from_templates(templates)
steps =
templates.map do |template|
named_step = named_steps.find { |ns| template.name == ns.name }
NamedTasksNamedStep.associate_named_step_with_named_task(task.named_task, template, named_step)
step = where(task_id: task.task_id, named_step_id: named_step.named_step_id).first
step ||= build_default_step!(task, template, named_step)
step
end
set_up_dependent_steps(steps, templates)
end
|
.get_viable_steps(task, sequence) ⇒ Object
326
327
328
329
330
331
332
333
334
335
336
337
|
# File 'app/models/tasker/workflow_step.rb', line 326
def self.get_viable_steps(task, sequence)
step_ids = sequence.steps.map(&:workflow_step_id)
ready_statuses = StepReadinessStatus.for_task(task.task_id, step_ids)
ready_step_ids = ready_statuses.select(&:ready_for_execution).map(&:workflow_step_id)
WorkflowStep.where(workflow_step_id: ready_step_ids)
.includes(:named_step)
end
|
.set_up_dependent_steps(steps, templates) ⇒ Object
285
286
287
288
289
290
291
292
293
294
295
296
297
298
|
# File 'app/models/tasker/workflow_step.rb', line 285
def self.set_up_dependent_steps(steps, templates)
templates.each do |template|
next if template.all_dependencies.empty?
dependent_step = steps.find { |step| step.name == template.name }
template.all_dependencies.each do |dependency|
provider_step = steps.find { |step| step.name == dependency }
unless provider_step.outgoing_edges.exists?(to_step: dependent_step)
provider_step.add_provides_edge!(dependent_step)
end
end
end
steps
end
|
.task_completion_stats(task) ⇒ Hash
Efficient method to get task completion statistics using ActiveRecord scopes This avoids the N+1 query problem while working with the state machine system
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'app/models/tasker/workflow_step.rb', line 141
def self.task_completion_stats(task)
task_steps = for_task(task)
total_steps = task_steps.count
completed_steps = task_steps.completed
failed_steps = task_steps.failed
completed_count = completed_steps.count
failed_count = failed_steps.count
pending_count = total_steps - completed_count - failed_count
latest_completion_time = completed_steps.maximum(:processed_at)
{
total_steps: total_steps,
completed_steps: completed_count,
failed_steps: failed_count,
pending_steps: pending_count,
latest_completion_time: latest_completion_time,
all_complete: completed_count == total_steps && total_steps.positive?
}
end
|
Instance Method Details
#add_provides_edge!(to_step) ⇒ Object
339
340
341
|
# File 'app/models/tasker/workflow_step.rb', line 339
def add_provides_edge!(to_step)
outgoing_edges.create!(to_step: to_step, name: PROVIDES_EDGE_NAME)
end
|
#can_retry_now? ⇒ Boolean
420
421
422
423
424
425
426
427
|
# File 'app/models/tasker/workflow_step.rb', line 420
def can_retry_now?
return false unless in_error?
return false unless retry_eligible?
return false if waiting_for_backoff?
true
end
|
#cancelled? ⇒ Boolean
371
372
373
374
|
# File 'app/models/tasker/workflow_step.rb', line 371
def cancelled?
step_readiness_status&.current_state == Constants::WorkflowStepStatuses::CANCELLED
end
|
#complete? ⇒ Boolean
348
349
350
351
352
353
354
|
# File 'app/models/tasker/workflow_step.rb', line 348
def complete?
step_readiness_status&.current_state&.in?([
Constants::WorkflowStepStatuses::COMPLETE,
Constants::WorkflowStepStatuses::RESOLVED_MANUALLY
]) || false
end
|
#dependencies_satisfied? ⇒ Boolean
Function-based predicate methods
389
390
391
392
|
# File 'app/models/tasker/workflow_step.rb', line 389
def dependencies_satisfied?
step_readiness_status&.dependencies_satisfied || false
end
|
#has_retry_attempts? ⇒ Boolean
399
400
401
402
|
# File 'app/models/tasker/workflow_step.rb', line 399
def has_retry_attempts?
(step_readiness_status&.attempts || 0).positive?
end
|
#in_error? ⇒ Boolean
366
367
368
369
|
# File 'app/models/tasker/workflow_step.rb', line 366
def in_error?
step_readiness_status&.current_state == Constants::WorkflowStepStatuses::ERROR
end
|
#in_progress? ⇒ Boolean
356
357
358
359
|
# File 'app/models/tasker/workflow_step.rb', line 356
def in_progress?
step_readiness_status&.current_state == Constants::WorkflowStepStatuses::IN_PROGRESS
end
|
#leaf_step? ⇒ Boolean
434
435
436
437
|
# File 'app/models/tasker/workflow_step.rb', line 434
def leaf_step?
step_dag_relationship&.child_count&.zero?
end
|
#pending? ⇒ Boolean
361
362
363
364
|
# File 'app/models/tasker/workflow_step.rb', line 361
def pending?
step_readiness_status&.current_state == Constants::WorkflowStepStatuses::PENDING
end
|
#ready? ⇒ Boolean
383
384
385
386
|
# File 'app/models/tasker/workflow_step.rb', line 383
def ready?
step_readiness_status&.ready_for_execution || false
end
|
#ready_status? ⇒ Boolean
376
377
378
379
380
381
|
# File 'app/models/tasker/workflow_step.rb', line 376
def ready_status?
Constants::UNREADY_WORKFLOW_STEP_STATUSES.exclude?(
step_readiness_status&.current_state || Constants::WorkflowStepStatuses::PENDING
)
end
|
#reload ⇒ Object
439
440
441
442
443
444
|
# File 'app/models/tasker/workflow_step.rb', line 439
def reload
super.tap do
@step_readiness_status = nil
end
end
|
#retry_eligible? ⇒ Boolean
394
395
396
397
|
# File 'app/models/tasker/workflow_step.rb', line 394
def retry_eligible?
step_readiness_status&.retry_eligible || false
end
|
#retry_exhausted? ⇒ Boolean
404
405
406
407
408
409
410
411
|
# File 'app/models/tasker/workflow_step.rb', line 404
def retry_exhausted?
return false unless step_readiness_status
attempts = step_readiness_status.attempts || 0
retry_limit = step_readiness_status.retry_limit || 3
attempts >= retry_limit
end
|
#root_step? ⇒ Boolean
429
430
431
432
|
# File 'app/models/tasker/workflow_step.rb', line 429
def root_step?
(step_readiness_status&.total_parents || 0).zero?
end
|
#state_machine ⇒ Object
State machine integration
172
173
174
175
176
177
178
|
# File 'app/models/tasker/workflow_step.rb', line 172
def state_machine
@state_machine ||= Tasker::StateMachine::StepStateMachine.new(
self,
transition_class: Tasker::WorkflowStepTransition,
association_name: :workflow_step_transitions
)
end
|
#status ⇒ Object
Status is now entirely managed by the state machine
181
182
183
184
185
186
187
188
189
|
# File 'app/models/tasker/workflow_step.rb', line 181
def status
if new_record?
Tasker::Constants::WorkflowStepStatuses::PENDING
else
state_machine.current_state
end
end
|
#step_readiness_status ⇒ Object
Helper method to get step readiness status using function-based approach
344
345
346
|
# File 'app/models/tasker/workflow_step.rb', line 344
def step_readiness_status
@step_readiness_status ||= StepReadinessStatus.for_task(task_id, [workflow_step_id]).first
end
|
#waiting_for_backoff? ⇒ Boolean
413
414
415
416
417
418
|
# File 'app/models/tasker/workflow_step.rb', line 413
def waiting_for_backoff?
return false unless step_readiness_status&.next_retry_at
step_readiness_status.next_retry_at > Time.current
end
|