Class: Tasker::WorkflowStep
- Inherits:
-
ApplicationRecord
- Object
- ActiveRecord::Base
- ApplicationRecord
- Tasker::WorkflowStep
- Defined in:
- app/models/tasker/workflow_step.rb
Defined Under Namespace
Classes: StepFinder
Constant Summary collapse
- PROVIDES_EDGE_NAME =
'provides'
Class Method Summary collapse
- .build_default_step!(task, template, named_step) ⇒ Object
-
.by_current_state ⇒ ActiveRecord::Relation
Scopes workflow steps by their current state using state machine transitions.
-
.completed_since ⇒ ActiveRecord::Relation
Scopes workflow steps completed since a specific time.
-
.failed_since ⇒ ActiveRecord::Relation
Scopes workflow steps that failed since a specific time.
-
.find_step_by_name(steps, name) ⇒ WorkflowStep?
Finds a WorkflowStep with the given name by traversing the DAG efficiently.
-
.for_tasks_since ⇒ ActiveRecord::Relation
Scopes workflow steps for tasks created since a specific time.
- .get_steps_for_task(task, templates) ⇒ Object
- .get_viable_steps(task, sequence) ⇒ Object
- .set_up_dependent_steps(steps, templates) ⇒ Object
-
.task_completion_stats(task) ⇒ Hash
Efficient method to get task completion statistics using ActiveRecord scopes This avoids the N+1 query problem while working with the state machine system.
Instance Method Summary collapse
- #add_provides_edge!(to_step) ⇒ Object
- #can_retry_now? ⇒ Boolean
- #cancelled? ⇒ Boolean
- #complete? ⇒ Boolean
-
#dependencies_satisfied? ⇒ Boolean
Function-based predicate methods.
- #has_retry_attempts? ⇒ Boolean
- #in_error? ⇒ Boolean
- #in_progress? ⇒ Boolean
- #leaf_step? ⇒ Boolean
- #pending? ⇒ Boolean
- #ready? ⇒ Boolean
- #ready_status? ⇒ Boolean
- #reload ⇒ Object
- #retry_eligible? ⇒ Boolean
- #retry_exhausted? ⇒ Boolean
- #root_step? ⇒ Boolean
-
#state_machine ⇒ Object
State machine integration.
-
#status ⇒ Object
Status is now entirely managed by the state machine.
-
#step_readiness_status ⇒ Object
Helper method to get step readiness status using function-based approach.
- #waiting_for_backoff? ⇒ Boolean
Methods inherited from ApplicationRecord
configure_database_connections, database_configuration_exists?
Class Method Details
.build_default_step!(task, template, named_step) ⇒ Object
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'app/models/tasker/workflow_step.rb', line 300 def self.build_default_step!(task, template, named_step) # Create the step first without status step_attributes = { task_id: task.task_id, named_step_id: named_step.named_step_id, retryable: template.default_retryable, retry_limit: template.default_retry_limit, skippable: template.skippable, in_process: false, inputs: task.context, processed: false, attempts: 0, results: {} } step = new(step_attributes) step.save! # REMOVED: Automatic state machine initialization to prevent duplicate key violations # The state machine will initialize naturally when accessed, and factories may # have already created transitions through their own setup # step.state_machine.initialize_state_machine! step end |
.by_current_state ⇒ ActiveRecord::Relation
Scopes workflow steps by their current state using state machine transitions
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'app/models/tasker/workflow_step.rb', line 88 scope :by_current_state, lambda { |state = nil| relation = joins(<<-SQL.squish) INNER JOIN ( SELECT DISTINCT ON (workflow_step_id) workflow_step_id, to_state FROM tasker_workflow_step_transitions WHERE most_recent = true ORDER BY workflow_step_id, sort_key DESC ) current_transitions ON current_transitions.workflow_step_id = tasker_workflow_steps.workflow_step_id SQL if state.present? relation.where(current_transitions: { to_state: state }) else relation end } |
.completed_since ⇒ ActiveRecord::Relation
Scopes workflow steps completed since a specific time
110 111 112 113 114 |
# File 'app/models/tasker/workflow_step.rb', line 110 scope :completed_since, lambda { |since_time| joins(:workflow_step_transitions) .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'complete') .where('tasker_workflow_step_transitions.created_at > ?', since_time) } |
.failed_since ⇒ ActiveRecord::Relation
Scopes workflow steps that failed since a specific time
121 122 123 124 125 |
# File 'app/models/tasker/workflow_step.rb', line 121 scope :failed_since, lambda { |since_time| joins(:workflow_step_transitions) .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'error') .where('tasker_workflow_step_transitions.created_at > ?', since_time) } |
.find_step_by_name(steps, name) ⇒ WorkflowStep?
Finds a WorkflowStep with the given name by traversing the DAG efficiently
195 196 197 |
# File 'app/models/tasker/workflow_step.rb', line 195 def self.find_step_by_name(steps, name) StepFinder.find_by_name(steps, name) end |
.for_tasks_since ⇒ ActiveRecord::Relation
Scopes workflow steps for tasks created since a specific time
132 133 134 |
# File 'app/models/tasker/workflow_step.rb', line 132 scope :for_tasks_since, lambda { |since_time| joins(:task).where('tasker_tasks.created_at > ?', since_time) } |
.get_steps_for_task(task, templates) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'app/models/tasker/workflow_step.rb', line 272 def self.get_steps_for_task(task, templates) named_steps = NamedStep.create_named_steps_from_templates(templates) steps = templates.map do |template| named_step = named_steps.find { |ns| template.name == ns.name } NamedTasksNamedStep.associate_named_step_with_named_task(task.named_task, template, named_step) step = where(task_id: task.task_id, named_step_id: named_step.named_step_id).first step ||= build_default_step!(task, template, named_step) step end set_up_dependent_steps(steps, templates) end |
.get_viable_steps(task, sequence) ⇒ Object
326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'app/models/tasker/workflow_step.rb', line 326 def self.get_viable_steps(task, sequence) # Get step IDs from sequence step_ids = sequence.steps.map(&:workflow_step_id) # Use function-based approach for high-performance readiness checking ready_statuses = StepReadinessStatus.for_task(task.task_id, step_ids) ready_step_ids = ready_statuses.select(&:ready_for_execution).map(&:workflow_step_id) # Return WorkflowStep objects for ready steps WorkflowStep.where(workflow_step_id: ready_step_ids) .includes(:named_step) end |
.set_up_dependent_steps(steps, templates) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'app/models/tasker/workflow_step.rb', line 285 def self.set_up_dependent_steps(steps, templates) templates.each do |template| next if template.all_dependencies.empty? dependent_step = steps.find { |step| step.name == template.name } template.all_dependencies.each do |dependency| provider_step = steps.find { |step| step.name == dependency } unless provider_step.outgoing_edges.exists?(to_step: dependent_step) provider_step.add_provides_edge!(dependent_step) end end end steps end |
.task_completion_stats(task) ⇒ Hash
Efficient method to get task completion statistics using ActiveRecord scopes This avoids the N+1 query problem while working with the state machine system
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'app/models/tasker/workflow_step.rb', line 141 def self.task_completion_stats(task) # Use efficient ActiveRecord queries with the state machine task_steps = for_task(task) # Get completion statistics with optimized queries total_steps = task_steps.count completed_steps = task_steps.completed failed_steps = task_steps.failed # Calculate counts completed_count = completed_steps.count failed_count = failed_steps.count # For pending count, calculate as total minus completed and failed # This handles the case where new steps don't have transitions yet pending_count = total_steps - completed_count - failed_count # Get latest completion time from completed steps latest_completion_time = completed_steps.maximum(:processed_at) { total_steps: total_steps, completed_steps: completed_count, failed_steps: failed_count, pending_steps: pending_count, latest_completion_time: latest_completion_time, all_complete: completed_count == total_steps && total_steps.positive? } end |
Instance Method Details
#add_provides_edge!(to_step) ⇒ Object
339 340 341 |
# File 'app/models/tasker/workflow_step.rb', line 339 def add_provides_edge!(to_step) outgoing_edges.create!(to_step: to_step, name: PROVIDES_EDGE_NAME) end |
#can_retry_now? ⇒ Boolean
420 421 422 423 424 425 426 427 |
# File 'app/models/tasker/workflow_step.rb', line 420 def can_retry_now? # Comprehensive check if step can be retried right now return false unless in_error? return false unless retry_eligible? return false if waiting_for_backoff? true end |
#cancelled? ⇒ Boolean
371 372 373 374 |
# File 'app/models/tasker/workflow_step.rb', line 371 def cancelled? # Use function-based approach for consistent state checking step_readiness_status&.current_state == Constants::WorkflowStepStatuses::CANCELLED end |
#complete? ⇒ Boolean
348 349 350 351 352 353 354 |
# File 'app/models/tasker/workflow_step.rb', line 348 def complete? # Use function-based approach for consistent state checking step_readiness_status&.current_state&.in?([ Constants::WorkflowStepStatuses::COMPLETE, Constants::WorkflowStepStatuses::RESOLVED_MANUALLY ]) || false end |
#dependencies_satisfied? ⇒ Boolean
Function-based predicate methods
389 390 391 392 |
# File 'app/models/tasker/workflow_step.rb', line 389 def dependencies_satisfied? # Use function-based approach's pre-calculated dependency analysis step_readiness_status&.dependencies_satisfied || false end |
#has_retry_attempts? ⇒ Boolean
399 400 401 402 |
# File 'app/models/tasker/workflow_step.rb', line 399 def has_retry_attempts? # Check if step has made retry attempts (step_readiness_status&.attempts || 0).positive? end |
#in_error? ⇒ Boolean
366 367 368 369 |
# File 'app/models/tasker/workflow_step.rb', line 366 def in_error? # Use function-based approach for consistent state checking step_readiness_status&.current_state == Constants::WorkflowStepStatuses::ERROR end |
#in_progress? ⇒ Boolean
356 357 358 359 |
# File 'app/models/tasker/workflow_step.rb', line 356 def in_progress? # Use function-based approach for consistent state checking step_readiness_status&.current_state == Constants::WorkflowStepStatuses::IN_PROGRESS end |
#leaf_step? ⇒ Boolean
434 435 436 437 |
# File 'app/models/tasker/workflow_step.rb', line 434 def leaf_step? # Check if this is a leaf step using DAG relationship view step_dag_relationship&.child_count&.zero? end |
#pending? ⇒ Boolean
361 362 363 364 |
# File 'app/models/tasker/workflow_step.rb', line 361 def pending? # Use function-based approach for consistent state checking step_readiness_status&.current_state == Constants::WorkflowStepStatuses::PENDING end |
#ready? ⇒ Boolean
383 384 385 386 |
# File 'app/models/tasker/workflow_step.rb', line 383 def ready? # Use function-based approach's comprehensive readiness calculation step_readiness_status&.ready_for_execution || false end |
#ready_status? ⇒ Boolean
376 377 378 379 380 381 |
# File 'app/models/tasker/workflow_step.rb', line 376 def ready_status? # Use function-based approach for efficient ready status checking Constants::UNREADY_WORKFLOW_STEP_STATUSES.exclude?( step_readiness_status&.current_state || Constants::WorkflowStepStatuses::PENDING ) end |
#reload ⇒ Object
439 440 441 442 443 444 |
# File 'app/models/tasker/workflow_step.rb', line 439 def reload # Override reload to ensure step readiness status is refreshed super.tap do @step_readiness_status = nil # Reset cached readiness status end end |
#retry_eligible? ⇒ Boolean
394 395 396 397 |
# File 'app/models/tasker/workflow_step.rb', line 394 def retry_eligible? # Use function-based approach's retry/backoff calculation step_readiness_status&.retry_eligible || false end |
#retry_exhausted? ⇒ Boolean
404 405 406 407 408 409 410 411 |
# File 'app/models/tasker/workflow_step.rb', line 404 def retry_exhausted? # Check if step has exhausted retry attempts return false unless step_readiness_status attempts = step_readiness_status.attempts || 0 retry_limit = step_readiness_status.retry_limit || 3 attempts >= retry_limit end |
#root_step? ⇒ Boolean
429 430 431 432 |
# File 'app/models/tasker/workflow_step.rb', line 429 def root_step? # Check if this is a root step (no dependencies) (step_readiness_status&.total_parents || 0).zero? end |
#state_machine ⇒ Object
State machine integration
172 173 174 175 176 177 178 |
# File 'app/models/tasker/workflow_step.rb', line 172 def state_machine @state_machine ||= Tasker::StateMachine::StepStateMachine.new( self, transition_class: Tasker::WorkflowStepTransition, association_name: :workflow_step_transitions ) end |
#status ⇒ Object
Status is now entirely managed by the state machine
181 182 183 184 185 186 187 188 189 |
# File 'app/models/tasker/workflow_step.rb', line 181 def status if new_record? # For new records, return the initial state Tasker::Constants::WorkflowStepStatuses::PENDING else # For persisted records, use state machine state_machine.current_state end end |
#step_readiness_status ⇒ Object
Helper method to get step readiness status using function-based approach
344 345 346 |
# File 'app/models/tasker/workflow_step.rb', line 344 def step_readiness_status @step_readiness_status ||= StepReadinessStatus.for_task(task_id, [workflow_step_id]).first end |
#waiting_for_backoff? ⇒ Boolean
413 414 415 416 417 418 |
# File 'app/models/tasker/workflow_step.rb', line 413 def waiting_for_backoff? # Check if step is waiting for backoff period to expire return false unless step_readiness_status&.next_retry_at step_readiness_status.next_retry_at > Time.current end |