Class: Tasker::WorkflowStep

Inherits:
ApplicationRecord show all
Defined in:
app/models/tasker/workflow_step.rb

Defined Under Namespace

Classes: StepFinder

Constant Summary collapse

PROVIDES_EDGE_NAME =
'provides'

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ApplicationRecord

configure_database_connections, database_configuration_exists?

Class Method Details

.build_default_step!(task, template, named_step) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'app/models/tasker/workflow_step.rb', line 300

def self.build_default_step!(task, template, named_step)
  # Create the step first without status
  step_attributes = {
    task_id: task.task_id,
    named_step_id: named_step.named_step_id,
    retryable: template.default_retryable,
    retry_limit: template.default_retry_limit,
    skippable: template.skippable,
    in_process: false,
    inputs: task.context,
    processed: false,
    attempts: 0,
    results: {}
  }

  step = new(step_attributes)
  step.save!

  # REMOVED: Automatic state machine initialization to prevent duplicate key violations
  # The state machine will initialize naturally when accessed, and factories may
  # have already created transitions through their own setup
  # step.state_machine.initialize_state_machine!

  step
end

.by_current_stateActiveRecord::Relation

Scopes workflow steps by their current state using state machine transitions

Parameters:

  • state (String, nil)

    The state to filter by. If nil, returns all steps with current state information

Returns:

  • (ActiveRecord::Relation)

    Steps with current state, optionally filtered by specific state



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'app/models/tasker/workflow_step.rb', line 88

scope :by_current_state, lambda { |state = nil|
  relation = joins(<<-SQL.squish)
    INNER JOIN (
      SELECT DISTINCT ON (workflow_step_id) workflow_step_id, to_state
      FROM tasker_workflow_step_transitions
      WHERE most_recent = true
      ORDER BY workflow_step_id, sort_key DESC
    ) current_transitions ON current_transitions.workflow_step_id = tasker_workflow_steps.workflow_step_id
  SQL

  if state.present?
    relation.where(current_transitions: { to_state: state })
  else
    relation
  end
}

.completed_sinceActiveRecord::Relation

Scopes workflow steps completed since a specific time

Parameters:

  • since_time (Time)

    The earliest completion time to include

Returns:

  • (ActiveRecord::Relation)

    Steps completed since the specified time



110
111
112
113
114
# File 'app/models/tasker/workflow_step.rb', line 110

scope :completed_since, lambda { |since_time|
  joins(:workflow_step_transitions)
    .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'complete')
    .where('tasker_workflow_step_transitions.created_at > ?', since_time)
}

.failed_sinceActiveRecord::Relation

Scopes workflow steps that failed since a specific time

Parameters:

  • since_time (Time)

    The earliest failure time to include

Returns:

  • (ActiveRecord::Relation)

    Steps that failed since the specified time



121
122
123
124
125
# File 'app/models/tasker/workflow_step.rb', line 121

scope :failed_since, lambda { |since_time|
  joins(:workflow_step_transitions)
    .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'error')
    .where('tasker_workflow_step_transitions.created_at > ?', since_time)
}

.find_step_by_name(steps, name) ⇒ WorkflowStep?

Finds a WorkflowStep with the given name by traversing the DAG efficiently

Parameters:

  • steps (Array<WorkflowStep>)

    Collection of steps to search through

  • name (String)

    Name of the step to find

Returns:

  • (WorkflowStep, nil)

    The first matching step found or nil if none exists



195
196
197
# File 'app/models/tasker/workflow_step.rb', line 195

def self.find_step_by_name(steps, name)
  StepFinder.find_by_name(steps, name)
end

.for_tasks_sinceActiveRecord::Relation

Scopes workflow steps for tasks created since a specific time

Parameters:

  • since_time (Time)

    The earliest task creation time to include

Returns:

  • (ActiveRecord::Relation)

    Steps for tasks created since the specified time



132
133
134
# File 'app/models/tasker/workflow_step.rb', line 132

scope :for_tasks_since, lambda { |since_time|
  joins(:task).where('tasker_tasks.created_at > ?', since_time)
}

.get_steps_for_task(task, templates) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
# File 'app/models/tasker/workflow_step.rb', line 272

def self.get_steps_for_task(task, templates)
  named_steps = NamedStep.create_named_steps_from_templates(templates)
  steps =
    templates.map do |template|
      named_step = named_steps.find { |ns| template.name == ns.name }
      NamedTasksNamedStep.associate_named_step_with_named_task(task.named_task, template, named_step)
      step = where(task_id: task.task_id, named_step_id: named_step.named_step_id).first
      step ||= build_default_step!(task, template, named_step)
      step
    end
  set_up_dependent_steps(steps, templates)
end

.get_viable_steps(task, sequence) ⇒ Object



326
327
328
329
330
331
332
333
334
335
336
337
# File 'app/models/tasker/workflow_step.rb', line 326

def self.get_viable_steps(task, sequence)
  # Get step IDs from sequence
  step_ids = sequence.steps.map(&:workflow_step_id)

  # Use function-based approach for high-performance readiness checking
  ready_statuses = StepReadinessStatus.for_task(task.task_id, step_ids)
  ready_step_ids = ready_statuses.select(&:ready_for_execution).map(&:workflow_step_id)

  # Return WorkflowStep objects for ready steps
  WorkflowStep.where(workflow_step_id: ready_step_ids)
              .includes(:named_step)
end

.set_up_dependent_steps(steps, templates) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'app/models/tasker/workflow_step.rb', line 285

def self.set_up_dependent_steps(steps, templates)
  templates.each do |template|
    next if template.all_dependencies.empty?

    dependent_step = steps.find { |step| step.name == template.name }
    template.all_dependencies.each do |dependency|
      provider_step = steps.find { |step| step.name == dependency }
      unless provider_step.outgoing_edges.exists?(to_step: dependent_step)
        provider_step.add_provides_edge!(dependent_step)
      end
    end
  end
  steps
end

.task_completion_stats(task) ⇒ Hash

Efficient method to get task completion statistics using ActiveRecord scopes This avoids the N+1 query problem while working with the state machine system

Parameters:

  • task (Task)

    The task to analyze

Returns:

  • (Hash)

    Hash with completion statistics and latest completion time



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'app/models/tasker/workflow_step.rb', line 141

def self.task_completion_stats(task)
  # Use efficient ActiveRecord queries with the state machine
  task_steps = for_task(task)

  # Get completion statistics with optimized queries
  total_steps = task_steps.count
  completed_steps = task_steps.completed
  failed_steps = task_steps.failed

  # Calculate counts
  completed_count = completed_steps.count
  failed_count = failed_steps.count

  # For pending count, calculate as total minus completed and failed
  # This handles the case where new steps don't have transitions yet
  pending_count = total_steps - completed_count - failed_count

  # Get latest completion time from completed steps
  latest_completion_time = completed_steps.maximum(:processed_at)

  {
    total_steps: total_steps,
    completed_steps: completed_count,
    failed_steps: failed_count,
    pending_steps: pending_count,
    latest_completion_time: latest_completion_time,
    all_complete: completed_count == total_steps && total_steps.positive?
  }
end

Instance Method Details

#add_provides_edge!(to_step) ⇒ Object



339
340
341
# File 'app/models/tasker/workflow_step.rb', line 339

def add_provides_edge!(to_step)
  outgoing_edges.create!(to_step: to_step, name: PROVIDES_EDGE_NAME)
end

#can_retry_now?Boolean

Returns:

  • (Boolean)


420
421
422
423
424
425
426
427
# File 'app/models/tasker/workflow_step.rb', line 420

def can_retry_now?
  # Comprehensive check if step can be retried right now
  return false unless in_error?
  return false unless retry_eligible?
  return false if waiting_for_backoff?

  true
end

#cancelled?Boolean

Returns:

  • (Boolean)


371
372
373
374
# File 'app/models/tasker/workflow_step.rb', line 371

def cancelled?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::CANCELLED
end

#complete?Boolean

Returns:

  • (Boolean)


348
349
350
351
352
353
354
# File 'app/models/tasker/workflow_step.rb', line 348

def complete?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state&.in?([
                                              Constants::WorkflowStepStatuses::COMPLETE,
                                              Constants::WorkflowStepStatuses::RESOLVED_MANUALLY
                                            ]) || false
end

#dependencies_satisfied?Boolean

Function-based predicate methods

Returns:

  • (Boolean)


389
390
391
392
# File 'app/models/tasker/workflow_step.rb', line 389

def dependencies_satisfied?
  # Use function-based approach's pre-calculated dependency analysis
  step_readiness_status&.dependencies_satisfied || false
end

#has_retry_attempts?Boolean

Returns:

  • (Boolean)


399
400
401
402
# File 'app/models/tasker/workflow_step.rb', line 399

def has_retry_attempts?
  # Check if step has made retry attempts
  (step_readiness_status&.attempts || 0).positive?
end

#in_error?Boolean

Returns:

  • (Boolean)


366
367
368
369
# File 'app/models/tasker/workflow_step.rb', line 366

def in_error?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::ERROR
end

#in_progress?Boolean

Returns:

  • (Boolean)


356
357
358
359
# File 'app/models/tasker/workflow_step.rb', line 356

def in_progress?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::IN_PROGRESS
end

#leaf_step?Boolean

Returns:

  • (Boolean)


434
435
436
437
# File 'app/models/tasker/workflow_step.rb', line 434

def leaf_step?
  # Check if this is a leaf step using DAG relationship view
  step_dag_relationship&.child_count&.zero?
end

#pending?Boolean

Returns:

  • (Boolean)


361
362
363
364
# File 'app/models/tasker/workflow_step.rb', line 361

def pending?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::PENDING
end

#ready?Boolean

Returns:

  • (Boolean)


383
384
385
386
# File 'app/models/tasker/workflow_step.rb', line 383

def ready?
  # Use function-based approach's comprehensive readiness calculation
  step_readiness_status&.ready_for_execution || false
end

#ready_status?Boolean

Returns:

  • (Boolean)


376
377
378
379
380
381
# File 'app/models/tasker/workflow_step.rb', line 376

def ready_status?
  # Use function-based approach for efficient ready status checking
  Constants::UNREADY_WORKFLOW_STEP_STATUSES.exclude?(
    step_readiness_status&.current_state || Constants::WorkflowStepStatuses::PENDING
  )
end

#reloadObject



439
440
441
442
443
444
# File 'app/models/tasker/workflow_step.rb', line 439

def reload
  # Override reload to ensure step readiness status is refreshed
  super.tap do
    @step_readiness_status = nil # Reset cached readiness status
  end
end

#retry_eligible?Boolean

Returns:

  • (Boolean)


394
395
396
397
# File 'app/models/tasker/workflow_step.rb', line 394

def retry_eligible?
  # Use function-based approach's retry/backoff calculation
  step_readiness_status&.retry_eligible || false
end

#retry_exhausted?Boolean

Returns:

  • (Boolean)


404
405
406
407
408
409
410
411
# File 'app/models/tasker/workflow_step.rb', line 404

def retry_exhausted?
  # Check if step has exhausted retry attempts
  return false unless step_readiness_status

  attempts = step_readiness_status.attempts || 0
  retry_limit = step_readiness_status.retry_limit || 3
  attempts >= retry_limit
end

#root_step?Boolean

Returns:

  • (Boolean)


429
430
431
432
# File 'app/models/tasker/workflow_step.rb', line 429

def root_step?
  # Check if this is a root step (no dependencies)
  (step_readiness_status&.total_parents || 0).zero?
end

#state_machineObject

State machine integration



172
173
174
175
176
177
178
# File 'app/models/tasker/workflow_step.rb', line 172

def state_machine
  @state_machine ||= Tasker::StateMachine::StepStateMachine.new(
    self,
    transition_class: Tasker::WorkflowStepTransition,
    association_name: :workflow_step_transitions
  )
end

#statusObject

Status is now entirely managed by the state machine



181
182
183
184
185
186
187
188
189
# File 'app/models/tasker/workflow_step.rb', line 181

def status
  if new_record?
    # For new records, return the initial state
    Tasker::Constants::WorkflowStepStatuses::PENDING
  else
    # For persisted records, use state machine
    state_machine.current_state
  end
end

#step_readiness_statusObject

Helper method to get step readiness status using function-based approach



344
345
346
# File 'app/models/tasker/workflow_step.rb', line 344

def step_readiness_status
  @step_readiness_status ||= StepReadinessStatus.for_task(task_id, [workflow_step_id]).first
end

#waiting_for_backoff?Boolean

Returns:

  • (Boolean)


413
414
415
416
417
418
# File 'app/models/tasker/workflow_step.rb', line 413

def waiting_for_backoff?
  # Check if step is waiting for backoff period to expire
  return false unless step_readiness_status&.next_retry_at

  step_readiness_status.next_retry_at > Time.current
end