Class: Tasker::Orchestration::TaskFinalizer

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::IdempotentStateTransitions
Defined in:
lib/tasker/orchestration/task_finalizer.rb

Overview

TaskFinalizer handles task completion and finalization logic

This class provides implementation for task finalization while firing lifecycle events for observability. Enhanced with TaskExecutionContext integration for intelligent decision making.

Defined Under Namespace

Classes: BlockageChecker, ContextManager, DelayCalculator, FinalizationDecisionMaker, FinalizationProcessor, ReasonDeterminer, ReenqueueManager, UnclearStateHandler

Instance Method Summary collapse

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::IdempotentStateTransitions

#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to

Instance Method Details

#blocked_by_errors?(task, _sequence, _processed_steps) ⇒ Boolean

Check if the task is blocked by errors

Parameters:

Returns:

  • (Boolean)

    True if task is blocked by errors



26
27
28
# File 'lib/tasker/orchestration/task_finalizer.rb', line 26

def blocked_by_errors?(task, _sequence, _processed_steps)
  BlockageChecker.blocked_by_errors?(task)
end

#complete_task(task, context) ⇒ Object

Service method exposed for FinalizationDecisionMaker



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/tasker/orchestration/task_finalizer.rb', line 62

def complete_task(task, context)
  payload = {
    task: task,
    completed_at: Time.current,
    completion_percentage: context&.completion_percentage,
    total_steps: context&.total_steps,
    health_status: context&.health_status
  }

  # Ensure task is in in_progress state before transitioning to complete
  current_state = task.state_machine.current_state || Constants::TaskStatuses::PENDING

  Rails.logger.debug { "TaskFinalizer: Task #{task.task_id} current state is #{current_state}" }
  # If task is already complete, just publish the event
  if current_state == Constants::TaskStatuses::COMPLETE
    return unless task.update({ complete: true })

    publish_task_completed(**payload)
    return
  end

  # Handle task in error state - must go through pending first
  if current_state == Constants::TaskStatuses::ERROR
    return unless safe_transition_to(task, Constants::TaskStatuses::PENDING)

    Rails.logger.debug { "TaskFinalizer: Task #{task.task_id} transitioning from error to pending" }
    # Reset current state to pending for next transition
    # This is necessary because the state machine might not allow direct transition from error to complete
    # without going through pending first
    current_state = Constants::TaskStatuses::PENDING
  end

  # Ensure task is in in_progress state (transition only if not already there)
  if current_state != Constants::TaskStatuses::IN_PROGRESS && !safe_transition_to(task,
                                                                                  Constants::TaskStatuses::IN_PROGRESS)
    return
  end

  # Now transition to complete - always attempt this transition
  Rails.logger.debug { "TaskFinalizer: Task #{task.task_id} transitioning to complete" }
  ActiveRecord::Base.transaction do
    task.update!({ complete: true })
    unless safe_transition_to(task, Constants::TaskStatuses::COMPLETE)
      raise ActiveRecord::Rollback, "Failed to transition task #{task.task_id} to complete state"
    end
  end
  publish_task_completed(task, **payload)
  Rails.logger.info("TaskFinalizer: Task #{task.task_id} completed successfully (#{context&.total_steps} steps,
                      #{context&.completion_percentage}% complete)")
end

#error_task(task, context) ⇒ Object

Service method exposed for FinalizationDecisionMaker



114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/tasker/orchestration/task_finalizer.rb', line 114

def error_task(task, context)
  return unless safe_transition_to(task, Constants::TaskStatuses::ERROR)

  publish_task_failed(
    task,
    error_message: Constants::TaskFinalization::ErrorMessages::STEPS_IN_ERROR_STATE,
    failed_steps_count: context&.failed_steps,
    completion_percentage: context&.completion_percentage,
    health_status: context&.health_status,
    total_steps: context&.total_steps
  )

  Rails.logger.info("TaskFinalizer: Task #{task.task_id} failed - #{context&.failed_steps} failed steps, #{context&.health_status} health")
end

#finalize_task(task_id, synchronous: false) ⇒ Object

Finalize a task based on its current state using TaskExecutionContext

Parameters:

  • task_id (Integer)

    The task ID to finalize

  • synchronous (Boolean) (defaults to: false)

    Whether this is synchronous processing (default: false)



43
44
45
46
47
48
# File 'lib/tasker/orchestration/task_finalizer.rb', line 43

def finalize_task(task_id, synchronous: false)
  task = Tasker::Task.find(task_id)
  context = ContextManager.get_task_execution_context(task_id)

  FinalizationDecisionMaker.make_finalization_decision(task, context, synchronous, self)
end

#finalize_task_with_steps(task, _sequence, processed_steps) ⇒ Object

Finalize a task with processed steps

Parameters:



35
36
37
# File 'lib/tasker/orchestration/task_finalizer.rb', line 35

def finalize_task_with_steps(task, _sequence, processed_steps)
  FinalizationProcessor.finalize_with_steps(task, processed_steps, self)
end

#handle_no_viable_steps(event) ⇒ Object

Handle no viable steps event

Convenience method for event-driven workflows when no viable steps are found. This triggers task finalization to determine next action.

Parameters:

  • event (Hash)

    Event payload with task_id



56
57
58
59
# File 'lib/tasker/orchestration/task_finalizer.rb', line 56

def handle_no_viable_steps(event)
  task_id = event[:task_id] || event
  finalize_task(task_id)
end

#pending_task(task, context, reason: nil) ⇒ Object

Service method exposed for FinalizationDecisionMaker



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/tasker/orchestration/task_finalizer.rb', line 130

def pending_task(task, context, reason: nil)
  return unless safe_transition_to(task, Constants::TaskStatuses::PENDING)

  reason ||= ReasonDeterminer.determine_pending_reason(context)

  publish_task_pending_transition(
    task,
    reason: reason,
    ready_steps: context&.ready_steps,
    in_progress_steps: context&.in_progress_steps,
    completion_percentage: context&.completion_percentage,
    health_status: context&.health_status
  )

  Rails.logger.info("TaskFinalizer: Task #{task.task_id} set to pending - #{reason} (ready_steps: #{context&.ready_steps}, in_progress: #{context&.in_progress_steps})")
end

#publish_finalization_completed(task, processed_steps, context) ⇒ Object

Service method exposed for event publishing



168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/tasker/orchestration/task_finalizer.rb', line 168

def publish_finalization_completed(task, processed_steps, context)
  publish_task_finalization_completed(
    task,
    processed_steps_count: processed_steps.size,
    execution_status: context&.execution_status,
    health_status: context&.health_status,
    completion_percentage: context&.completion_percentage,
    total_steps: context&.total_steps,
    ready_steps: context&.ready_steps,
    failed_steps: context&.failed_steps,
    recommended_action: context&.recommended_action
  )
end

#publish_finalization_started(task, processed_steps, context) ⇒ Object

Service method exposed for event publishing



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/tasker/orchestration/task_finalizer.rb', line 153

def publish_finalization_started(task, processed_steps, context)
  publish_task_finalization_started(
    task,
    processed_steps_count: processed_steps.size,
    execution_status: context&.execution_status,
    health_status: context&.health_status,
    completion_percentage: context&.completion_percentage,
    total_steps: context&.total_steps,
    ready_steps: context&.ready_steps,
    failed_steps: context&.failed_steps,
    recommended_action: context&.recommended_action
  )
end

#reenqueue_task_with_context(task, context, reason: nil) ⇒ Object

Service method exposed for FinalizationDecisionMaker



148
149
150
# File 'lib/tasker/orchestration/task_finalizer.rb', line 148

def reenqueue_task_with_context(task, context, reason: nil)
  ReenqueueManager.reenqueue_with_context(task, context, reason)
end