Class: Tasker::TaskHandler::StepGroup
- Inherits:
-
Object
- Object
- Tasker::TaskHandler::StepGroup
- Defined in:
- lib/tasker/task_handler/step_group.rb
Overview
Manages and analyzes groups of workflow steps
StepGroup is used to track the status of steps in a workflow and determine whether a task is complete, can be finalized, or needs further processing. It traverses the step dependency graph to find incomplete steps.
Instance Attribute Summary collapse
-
#prior_incomplete_steps ⇒ Array<Tasker::WorkflowStep>
Steps that were incomplete before this processing pass.
-
#still_incomplete_steps ⇒ Array<Tasker::WorkflowStep>
Steps that are still incomplete after this pass.
-
#still_working_steps ⇒ Array<Tasker::WorkflowStep>
Steps that are still in a working state (pending/in progress).
-
#this_pass_complete_step_ids ⇒ Array<Integer>
IDs of steps completed in this processing pass.
-
#this_pass_complete_steps ⇒ Array<Tasker::WorkflowStep>
Steps that were completed in this processing pass.
Class Method Summary collapse
-
.build(task, sequence, steps) ⇒ StepGroup
Build a StepGroup for the given task, sequence and steps.
Instance Method Summary collapse
-
#build ⇒ void
Build the step group by analyzing all step collections.
-
#build_prior_incomplete_steps ⇒ void
Find all steps that were incomplete prior to this processing pass.
-
#build_still_incomplete_steps ⇒ void
Find steps that are still incomplete after this processing pass.
-
#build_still_working_steps ⇒ void
Find steps that are still in a working state (pending/in progress).
-
#build_this_pass_complete_steps ⇒ void
Find steps that were completed in this processing pass.
-
#complete? ⇒ Boolean
Check if the task can be considered complete.
-
#debug_state ⇒ Hash
Get debugging state information for the step group.
-
#error? ⇒ Boolean
Check if the task has any steps in error states.
-
#find_incomplete_steps(steps, visited_step_ids) ⇒ void
Recursively traverse the DAG to find all incomplete steps.
-
#initialize(task, sequence, steps) ⇒ StepGroup
constructor
Initialize a new StepGroup.
-
#pending? ⇒ Boolean
Check if the task should be marked as pending for further processing.
Constructor Details
#initialize(task, sequence, steps) ⇒ StepGroup
Initialize a new StepGroup
45 46 47 48 49 |
# File 'lib/tasker/task_handler/step_group.rb', line 45 def initialize(task, sequence, steps) @task = task @sequence = sequence @steps = steps end |
Instance Attribute Details
#prior_incomplete_steps ⇒ Array<Tasker::WorkflowStep>
Returns Steps that were incomplete before this processing pass.
13 14 15 |
# File 'lib/tasker/task_handler/step_group.rb', line 13 def prior_incomplete_steps @prior_incomplete_steps end |
#still_incomplete_steps ⇒ Array<Tasker::WorkflowStep>
Returns Steps that are still incomplete after this pass.
19 20 21 |
# File 'lib/tasker/task_handler/step_group.rb', line 19 def still_incomplete_steps @still_incomplete_steps end |
#still_working_steps ⇒ Array<Tasker::WorkflowStep>
Returns Steps that are still in a working state (pending/in progress).
22 23 24 |
# File 'lib/tasker/task_handler/step_group.rb', line 22 def still_working_steps @still_working_steps end |
#this_pass_complete_step_ids ⇒ Array<Integer>
Returns IDs of steps completed in this processing pass.
25 26 27 |
# File 'lib/tasker/task_handler/step_group.rb', line 25 def this_pass_complete_step_ids @this_pass_complete_step_ids end |
#this_pass_complete_steps ⇒ Array<Tasker::WorkflowStep>
Returns Steps that were completed in this processing pass.
16 17 18 |
# File 'lib/tasker/task_handler/step_group.rb', line 16 def this_pass_complete_steps @this_pass_complete_steps end |
Class Method Details
.build(task, sequence, steps) ⇒ StepGroup
Build a StepGroup for the given task, sequence and steps
33 34 35 36 37 |
# File 'lib/tasker/task_handler/step_group.rb', line 33 def self.build(task, sequence, steps) inst = new(task, sequence, steps) inst.build inst end |
Instance Method Details
#build ⇒ void
This method returns an undefined value.
Build the step group by analyzing all step collections
54 55 56 57 58 59 |
# File 'lib/tasker/task_handler/step_group.rb', line 54 def build build_prior_incomplete_steps build_this_pass_complete_steps build_still_incomplete_steps build_still_working_steps end |
#build_prior_incomplete_steps ⇒ void
This method returns an undefined value.
Find all steps that were incomplete prior to this processing pass
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/tasker/task_handler/step_group.rb', line 64 def build_prior_incomplete_steps # determine which states were incomplete by traversing the entire DAG self.prior_incomplete_steps = [] # Find all root steps (those without parents) root_steps = @sequence.steps.select { |step| step.parents.empty? } # Recursively traverse the DAG to find all incomplete steps find_incomplete_steps(root_steps, []) end |
#build_still_incomplete_steps ⇒ void
This method returns an undefined value.
Find steps that are still incomplete after this processing pass
112 113 114 115 116 117 118 |
# File 'lib/tasker/task_handler/step_group.rb', line 112 def build_still_incomplete_steps # What was incomplete from the prior DAG traversal that is still incomplete now self.still_incomplete_steps = [] prior_incomplete_steps.each do |step| still_incomplete_steps << step if this_pass_complete_step_ids.exclude?(step.workflow_step_id) end end |
#build_still_working_steps ⇒ void
This method returns an undefined value.
Find steps that are still in a working state (pending/in progress)
123 124 125 126 127 128 129 |
# File 'lib/tasker/task_handler/step_group.rb', line 123 def build_still_working_steps # What is still working from the incomplete steps but in a valid, retryable state self.still_working_steps = [] still_incomplete_steps.each do |step| still_working_steps << step if Tasker::Constants::VALID_STEP_STILL_WORKING_STATES.include?(step.status) end end |
#build_this_pass_complete_steps ⇒ void
This method returns an undefined value.
Find steps that were completed in this processing pass
99 100 101 102 103 104 105 106 107 |
# File 'lib/tasker/task_handler/step_group.rb', line 99 def build_this_pass_complete_steps # The steps passed into finalize are those processed in this pass # Check which ones completed in a valid state self.this_pass_complete_steps = [] @steps.each do |step| this_pass_complete_steps << step if Tasker::Constants::VALID_STEP_COMPLETION_STATES.include?(step.status) end self.this_pass_complete_step_ids = this_pass_complete_steps.map(&:workflow_step_id) end |
#complete? ⇒ Boolean
Check if the task can be considered complete
A task is complete if there were no incomplete steps in the prior iteration or if all previously incomplete steps are now complete.
137 138 139 |
# File 'lib/tasker/task_handler/step_group.rb', line 137 def complete? prior_incomplete_steps.empty? || still_incomplete_steps.empty? end |
#debug_state ⇒ Hash
Get debugging state information for the step group
167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/tasker/task_handler/step_group.rb', line 167 def debug_state { total_steps: @sequence.steps.size, prior_incomplete_count: prior_incomplete_steps.size, complete_this_pass_count: this_pass_complete_steps.size, still_incomplete_count: still_incomplete_steps.size, still_working_count: still_working_steps.size, step_statuses: @sequence.steps.map { |s| { id: s.workflow_step_id, name: s.name, status: s.status } }, is_complete: complete?, is_pending: pending?, has_errors: error? } end |
#error? ⇒ Boolean
Check if the task has any steps in error states
A task has errors if any steps are in terminal error states that can't be retried.
155 156 157 158 159 160 161 162 |
# File 'lib/tasker/task_handler/step_group.rb', line 155 def error? # Use efficient database query with existing failed scope step_ids = @sequence.steps.map(&:workflow_step_id) return false if step_ids.empty? # Query for any steps in error state using the failed scope Tasker::WorkflowStep.failed.exists?(workflow_step_id: step_ids) end |
#find_incomplete_steps(steps, visited_step_ids) ⇒ void
This method returns an undefined value.
Recursively traverse the DAG to find all incomplete steps
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/tasker/task_handler/step_group.rb', line 80 def find_incomplete_steps(steps, visited_step_ids) steps.each do |step| # Skip if we've already visited this step (avoid cycles, though they shouldn't exist in a DAG) next if visited_step_ids.include?(step.workflow_step_id) # Add this step to visited visited_step_ids << step.workflow_step_id # Add to prior_incomplete_steps if this step is incomplete prior_incomplete_steps << step if Tasker::Constants::VALID_STEP_COMPLETION_STATES.exclude?(step.status) # Recursively check all children find_incomplete_steps(step.children, visited_step_ids) end end |
#pending? ⇒ Boolean
Check if the task should be marked as pending for further processing
A task is considered pending if there are still steps in a working state.
146 147 148 |
# File 'lib/tasker/task_handler/step_group.rb', line 146 def pending? still_working_steps.length.positive? end |