Class: Tasker::TaskHandler::StepGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/tasker/task_handler/step_group.rb

Overview

Manages and analyzes groups of workflow steps

StepGroup is used to track the status of steps in a workflow and determine whether a task is complete, can be finalized, or needs further processing. It traverses the step dependency graph to find incomplete steps.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, sequence, steps) ⇒ StepGroup

Initialize a new StepGroup

Parameters:



45
46
47
48
49
# File 'lib/tasker/task_handler/step_group.rb', line 45

def initialize(task, sequence, steps)
  @task = task
  @sequence = sequence
  @steps = steps
end

Instance Attribute Details

#prior_incomplete_stepsArray<Tasker::WorkflowStep>

Returns Steps that were incomplete before this processing pass.

Returns:



13
14
15
# File 'lib/tasker/task_handler/step_group.rb', line 13

def prior_incomplete_steps
  @prior_incomplete_steps
end

#still_incomplete_stepsArray<Tasker::WorkflowStep>

Returns Steps that are still incomplete after this pass.

Returns:



19
20
21
# File 'lib/tasker/task_handler/step_group.rb', line 19

def still_incomplete_steps
  @still_incomplete_steps
end

#still_working_stepsArray<Tasker::WorkflowStep>

Returns Steps that are still in a working state (pending/in progress).

Returns:



22
23
24
# File 'lib/tasker/task_handler/step_group.rb', line 22

def still_working_steps
  @still_working_steps
end

#this_pass_complete_step_idsArray<Integer>

Returns IDs of steps completed in this processing pass.

Returns:

  • (Array<Integer>)

    IDs of steps completed in this processing pass



25
26
27
# File 'lib/tasker/task_handler/step_group.rb', line 25

def this_pass_complete_step_ids
  @this_pass_complete_step_ids
end

#this_pass_complete_stepsArray<Tasker::WorkflowStep>

Returns Steps that were completed in this processing pass.

Returns:



16
17
18
# File 'lib/tasker/task_handler/step_group.rb', line 16

def this_pass_complete_steps
  @this_pass_complete_steps
end

Class Method Details

.build(task, sequence, steps) ⇒ StepGroup

Build a StepGroup for the given task, sequence and steps

Parameters:

Returns:



33
34
35
36
37
# File 'lib/tasker/task_handler/step_group.rb', line 33

def self.build(task, sequence, steps)
  inst = new(task, sequence, steps)
  inst.build
  inst
end

Instance Method Details

#buildvoid

This method returns an undefined value.

Build the step group by analyzing all step collections



54
55
56
57
58
59
# File 'lib/tasker/task_handler/step_group.rb', line 54

def build
  build_prior_incomplete_steps
  build_this_pass_complete_steps
  build_still_incomplete_steps
  build_still_working_steps
end

#build_prior_incomplete_stepsvoid

This method returns an undefined value.

Find all steps that were incomplete prior to this processing pass



64
65
66
67
68
69
70
71
72
73
# File 'lib/tasker/task_handler/step_group.rb', line 64

def build_prior_incomplete_steps
  # determine which states were incomplete by traversing the entire DAG
  self.prior_incomplete_steps = []

  # Find all root steps (those without parents)
  root_steps = @sequence.steps.select { |step| step.parents.empty? }

  # Recursively traverse the DAG to find all incomplete steps
  find_incomplete_steps(root_steps, [])
end

#build_still_incomplete_stepsvoid

This method returns an undefined value.

Find steps that are still incomplete after this processing pass



112
113
114
115
116
117
118
# File 'lib/tasker/task_handler/step_group.rb', line 112

def build_still_incomplete_steps
  # What was incomplete from the prior DAG traversal that is still incomplete now
  self.still_incomplete_steps = []
  prior_incomplete_steps.each do |step|
    still_incomplete_steps << step if this_pass_complete_step_ids.exclude?(step.workflow_step_id)
  end
end

#build_still_working_stepsvoid

This method returns an undefined value.

Find steps that are still in a working state (pending/in progress)



123
124
125
126
127
128
129
# File 'lib/tasker/task_handler/step_group.rb', line 123

def build_still_working_steps
  # What is still working from the incomplete steps but in a valid, retryable state
  self.still_working_steps = []
  still_incomplete_steps.each do |step|
    still_working_steps << step if Tasker::Constants::VALID_STEP_STILL_WORKING_STATES.include?(step.status)
  end
end

#build_this_pass_complete_stepsvoid

This method returns an undefined value.

Find steps that were completed in this processing pass



99
100
101
102
103
104
105
106
107
# File 'lib/tasker/task_handler/step_group.rb', line 99

def build_this_pass_complete_steps
  # The steps passed into finalize are those processed in this pass
  # Check which ones completed in a valid state
  self.this_pass_complete_steps = []
  @steps.each do |step|
    this_pass_complete_steps << step if Tasker::Constants::VALID_STEP_COMPLETION_STATES.include?(step.status)
  end
  self.this_pass_complete_step_ids = this_pass_complete_steps.map(&:workflow_step_id)
end

#complete?Boolean

Check if the task can be considered complete

A task is complete if there were no incomplete steps in the prior iteration or if all previously incomplete steps are now complete.

Returns:

  • (Boolean)

    True if the task is complete



137
138
139
# File 'lib/tasker/task_handler/step_group.rb', line 137

def complete?
  prior_incomplete_steps.empty? || still_incomplete_steps.empty?
end

#debug_stateHash

Get debugging state information for the step group

Returns:

  • (Hash)

    Debug information about the step group state



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/tasker/task_handler/step_group.rb', line 167

def debug_state
  {
    total_steps: @sequence.steps.size,
    prior_incomplete_count: prior_incomplete_steps.size,
    complete_this_pass_count: this_pass_complete_steps.size,
    still_incomplete_count: still_incomplete_steps.size,
    still_working_count: still_working_steps.size,
    step_statuses: @sequence.steps.map { |s| { id: s.workflow_step_id, name: s.name, status: s.status } },
    is_complete: complete?,
    is_pending: pending?,
    has_errors: error?
  }
end

#error?Boolean

Check if the task has any steps in error states

A task has errors if any steps are in terminal error states that can't be retried.

Returns:

  • (Boolean)

    True if the task has error steps



155
156
157
158
159
160
161
162
# File 'lib/tasker/task_handler/step_group.rb', line 155

def error?
  # Use efficient database query with existing failed scope
  step_ids = @sequence.steps.map(&:workflow_step_id)
  return false if step_ids.empty?

  # Query for any steps in error state using the failed scope
  Tasker::WorkflowStep.failed.exists?(workflow_step_id: step_ids)
end

#find_incomplete_steps(steps, visited_step_ids) ⇒ void

This method returns an undefined value.

Recursively traverse the DAG to find all incomplete steps

Parameters:

  • steps (Array<Tasker::WorkflowStep>)

    Steps to check

  • visited_step_ids (Array<Integer>)

    IDs of steps already visited



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/tasker/task_handler/step_group.rb', line 80

def find_incomplete_steps(steps, visited_step_ids)
  steps.each do |step|
    # Skip if we've already visited this step (avoid cycles, though they shouldn't exist in a DAG)
    next if visited_step_ids.include?(step.workflow_step_id)

    # Add this step to visited
    visited_step_ids << step.workflow_step_id

    # Add to prior_incomplete_steps if this step is incomplete
    prior_incomplete_steps << step if Tasker::Constants::VALID_STEP_COMPLETION_STATES.exclude?(step.status)

    # Recursively check all children
    find_incomplete_steps(step.children, visited_step_ids)
  end
end

#pending?Boolean

Check if the task should be marked as pending for further processing

A task is considered pending if there are still steps in a working state.

Returns:

  • (Boolean)

    True if the task should be pending



146
147
148
# File 'lib/tasker/task_handler/step_group.rb', line 146

def pending?
  still_working_steps.length.positive?
end