Class: Tasker::WorkflowStep

Inherits:
ApplicationRecord show all
Defined in:
app/models/tasker/workflow_step.rb

Defined Under Namespace

Classes: StepFinder

Constant Summary collapse

PROVIDES_EDGE_NAME =
'provides'

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ApplicationRecord

configure_database_connections, database_configuration_exists?

Class Method Details

.build_default_step!(task, template, named_step) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'app/models/tasker/workflow_step.rb', line 300

def self.build_default_step!(task, template, named_step)
  # Create the step first without status
  step_attributes = {
    task_id: task.task_id,
    named_step_id: named_step.named_step_id,
    retryable: template.default_retryable,
    retry_limit: template.default_retry_limit,
    skippable: template.skippable,
    in_process: false,
    inputs: task.context,
    processed: false,
    attempts: 0,
    results: {}
  }

  step = new(step_attributes)
  step.save!

  # REMOVED: Automatic state machine initialization to prevent duplicate key violations
  # The state machine will initialize naturally when accessed, and factories may
  # have already created transitions through their own setup
  # step.state_machine.initialize_state_machine!

  step
end

.by_current_stateActiveRecord::Relation

Scopes workflow steps by their current state using state machine transitions



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'app/models/tasker/workflow_step.rb', line 88

scope :by_current_state, lambda { |state = nil|
  relation = joins("    INNER JOIN (\n      SELECT DISTINCT ON (workflow_step_id) workflow_step_id, to_state\n      FROM tasker_workflow_step_transitions\n      WHERE most_recent = true\n      ORDER BY workflow_step_id, sort_key DESC\n    ) current_transitions ON current_transitions.workflow_step_id = tasker_workflow_steps.workflow_step_id\n  SQL\n\n  if state.present?\n    relation.where(current_transitions: { to_state: state })\n  else\n    relation\n  end\n}\n".squish)

.completed_sinceActiveRecord::Relation

Scopes workflow steps completed since a specific time



110
111
112
113
114
# File 'app/models/tasker/workflow_step.rb', line 110

scope :completed_since, lambda { |since_time|
  joins(:workflow_step_transitions)
    .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'complete')
    .where('tasker_workflow_step_transitions.created_at > ?', since_time)
}

.failed_sinceActiveRecord::Relation

Scopes workflow steps that failed since a specific time



121
122
123
124
125
# File 'app/models/tasker/workflow_step.rb', line 121

scope :failed_since, lambda { |since_time|
  joins(:workflow_step_transitions)
    .where('tasker_workflow_step_transitions.most_recent = ? AND tasker_workflow_step_transitions.to_state = ?', true, 'error')
    .where('tasker_workflow_step_transitions.created_at > ?', since_time)
}

.find_step_by_name(steps, name) ⇒ WorkflowStep?

Finds a WorkflowStep with the given name by traversing the DAG efficiently



195
196
197
# File 'app/models/tasker/workflow_step.rb', line 195

def self.find_step_by_name(steps, name)
  StepFinder.find_by_name(steps, name)
end

.for_tasks_sinceActiveRecord::Relation

Scopes workflow steps for tasks created since a specific time



132
133
134
# File 'app/models/tasker/workflow_step.rb', line 132

scope :for_tasks_since, lambda { |since_time|
  joins(:task).where('tasker_tasks.created_at > ?', since_time)
}

.get_steps_for_task(task, templates) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
# File 'app/models/tasker/workflow_step.rb', line 272

def self.get_steps_for_task(task, templates)
  named_steps = NamedStep.create_named_steps_from_templates(templates)
  steps =
    templates.map do |template|
      named_step = named_steps.find { |ns| template.name == ns.name }
      NamedTasksNamedStep.associate_named_step_with_named_task(task.named_task, template, named_step)
      step = where(task_id: task.task_id, named_step_id: named_step.named_step_id).first
      step ||= build_default_step!(task, template, named_step)
      step
    end
  set_up_dependent_steps(steps, templates)
end

.get_viable_steps(task, sequence) ⇒ Object



326
327
328
329
330
331
332
333
334
335
336
337
# File 'app/models/tasker/workflow_step.rb', line 326

def self.get_viable_steps(task, sequence)
  # Get step IDs from sequence
  step_ids = sequence.steps.map(&:workflow_step_id)

  # Use function-based approach for high-performance readiness checking
  ready_statuses = StepReadinessStatus.for_task(task.task_id, step_ids)
  ready_step_ids = ready_statuses.select(&:ready_for_execution).map(&:workflow_step_id)

  # Return WorkflowStep objects for ready steps
  WorkflowStep.where(workflow_step_id: ready_step_ids)
              .includes(:named_step)
end

.set_up_dependent_steps(steps, templates) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'app/models/tasker/workflow_step.rb', line 285

def self.set_up_dependent_steps(steps, templates)
  templates.each do |template|
    next if template.all_dependencies.empty?

    dependent_step = steps.find { |step| step.name == template.name }
    template.all_dependencies.each do |dependency|
      provider_step = steps.find { |step| step.name == dependency }
      unless provider_step.outgoing_edges.exists?(to_step: dependent_step)
        provider_step.add_provides_edge!(dependent_step)
      end
    end
  end
  steps
end

.task_completion_stats(task) ⇒ Hash

Efficient method to get task completion statistics using ActiveRecord scopes This avoids the N+1 query problem while working with the state machine system



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'app/models/tasker/workflow_step.rb', line 141

def self.task_completion_stats(task)
  # Use efficient ActiveRecord queries with the state machine
  task_steps = for_task(task)

  # Get completion statistics with optimized queries
  total_steps = task_steps.count
  completed_steps = task_steps.completed
  failed_steps = task_steps.failed

  # Calculate counts
  completed_count = completed_steps.count
  failed_count = failed_steps.count

  # For pending count, calculate as total minus completed and failed
  # This handles the case where new steps don't have transitions yet
  pending_count = total_steps - completed_count - failed_count

  # Get latest completion time from completed steps
  latest_completion_time = completed_steps.maximum(:processed_at)

  {
    total_steps: total_steps,
    completed_steps: completed_count,
    failed_steps: failed_count,
    pending_steps: pending_count,
    latest_completion_time: latest_completion_time,
    all_complete: completed_count == total_steps && total_steps.positive?
  }
end

Instance Method Details

#add_provides_edge!(to_step) ⇒ Object



339
340
341
# File 'app/models/tasker/workflow_step.rb', line 339

def add_provides_edge!(to_step)
  outgoing_edges.create!(to_step: to_step, name: PROVIDES_EDGE_NAME)
end

#can_retry_now?Boolean



420
421
422
423
424
425
426
427
# File 'app/models/tasker/workflow_step.rb', line 420

def can_retry_now?
  # Comprehensive check if step can be retried right now
  return false unless in_error?
  return false unless retry_eligible?
  return false if waiting_for_backoff?

  true
end

#cancelled?Boolean



371
372
373
374
# File 'app/models/tasker/workflow_step.rb', line 371

def cancelled?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::CANCELLED
end

#complete?Boolean



348
349
350
351
352
353
354
# File 'app/models/tasker/workflow_step.rb', line 348

def complete?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state&.in?([
                                              Constants::WorkflowStepStatuses::COMPLETE,
                                              Constants::WorkflowStepStatuses::RESOLVED_MANUALLY
                                            ]) || false
end

#dependencies_satisfied?Boolean

Function-based predicate methods



389
390
391
392
# File 'app/models/tasker/workflow_step.rb', line 389

def dependencies_satisfied?
  # Use function-based approach's pre-calculated dependency analysis
  step_readiness_status&.dependencies_satisfied || false
end

#has_retry_attempts?Boolean



399
400
401
402
# File 'app/models/tasker/workflow_step.rb', line 399

def has_retry_attempts?
  # Check if step has made retry attempts
  (step_readiness_status&.attempts || 0).positive?
end

#in_error?Boolean



366
367
368
369
# File 'app/models/tasker/workflow_step.rb', line 366

def in_error?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::ERROR
end

#in_progress?Boolean



356
357
358
359
# File 'app/models/tasker/workflow_step.rb', line 356

def in_progress?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::IN_PROGRESS
end

#leaf_step?Boolean



434
435
436
437
# File 'app/models/tasker/workflow_step.rb', line 434

def leaf_step?
  # Check if this is a leaf step using DAG relationship view
  step_dag_relationship&.child_count&.zero?
end

#pending?Boolean



361
362
363
364
# File 'app/models/tasker/workflow_step.rb', line 361

def pending?
  # Use function-based approach for consistent state checking
  step_readiness_status&.current_state == Constants::WorkflowStepStatuses::PENDING
end

#ready?Boolean



383
384
385
386
# File 'app/models/tasker/workflow_step.rb', line 383

def ready?
  # Use function-based approach's comprehensive readiness calculation
  step_readiness_status&.ready_for_execution || false
end

#ready_status?Boolean



376
377
378
379
380
381
# File 'app/models/tasker/workflow_step.rb', line 376

def ready_status?
  # Use function-based approach for efficient ready status checking
  Constants::UNREADY_WORKFLOW_STEP_STATUSES.exclude?(
    step_readiness_status&.current_state || Constants::WorkflowStepStatuses::PENDING
  )
end

#reloadObject



439
440
441
442
443
444
# File 'app/models/tasker/workflow_step.rb', line 439

def reload
  # Override reload to ensure step readiness status is refreshed
  super.tap do
    @step_readiness_status = nil # Reset cached readiness status
  end
end

#retry_eligible?Boolean



394
395
396
397
# File 'app/models/tasker/workflow_step.rb', line 394

def retry_eligible?
  # Use function-based approach's retry/backoff calculation
  step_readiness_status&.retry_eligible || false
end

#retry_exhausted?Boolean



404
405
406
407
408
409
410
411
# File 'app/models/tasker/workflow_step.rb', line 404

def retry_exhausted?
  # Check if step has exhausted retry attempts
  return false unless step_readiness_status

  attempts = step_readiness_status.attempts || 0
  retry_limit = step_readiness_status.retry_limit || 3
  attempts >= retry_limit
end

#root_step?Boolean



429
430
431
432
# File 'app/models/tasker/workflow_step.rb', line 429

def root_step?
  # Check if this is a root step (no dependencies)
  (step_readiness_status&.total_parents || 0).zero?
end

#state_machineObject

State machine integration



172
173
174
175
176
177
178
# File 'app/models/tasker/workflow_step.rb', line 172

def state_machine
  @state_machine ||= Tasker::StateMachine::StepStateMachine.new(
    self,
    transition_class: Tasker::WorkflowStepTransition,
    association_name: :workflow_step_transitions
  )
end

#statusObject

Status is now entirely managed by the state machine



181
182
183
184
185
186
187
188
189
# File 'app/models/tasker/workflow_step.rb', line 181

def status
  if new_record?
    # For new records, return the initial state
    Tasker::Constants::WorkflowStepStatuses::PENDING
  else
    # For persisted records, use state machine
    state_machine.current_state
  end
end

#step_readiness_statusObject

Helper method to get step readiness status using function-based approach



344
345
346
# File 'app/models/tasker/workflow_step.rb', line 344

def step_readiness_status
  @step_readiness_status ||= StepReadinessStatus.for_task(task_id, [workflow_step_id]).first
end

#waiting_for_backoff?Boolean



413
414
415
416
417
418
# File 'app/models/tasker/workflow_step.rb', line 413

def waiting_for_backoff?
  # Check if step is waiting for backoff period to expire
  return false unless step_readiness_status&.next_retry_at

  step_readiness_status.next_retry_at > Time.current
end