Class: Tasker::Orchestration::TaskInitializer

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::IdempotentStateTransitions
Defined in:
lib/tasker/orchestration/task_initializer.rb

Overview

TaskInitializer handles task creation and initialization logic

This component is responsible for:

  • Creating tasks from TaskRequest objects
  • Validating task context against schemas
  • Starting task execution (transitioning to in_progress)
  • Enqueuing tasks for processing

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::IdempotentStateTransitions

#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to

Class Method Details

.initialize_task!Tasker::Task

Initialize a new task from a task request

Creates a task record, validates the context against the schema, and enqueues the task for processing.

Parameters:

  • task_request (Tasker::Types::TaskRequest)

    The task request

  • task_handler (Object)

    The task handler instance for schema validation

Returns:



29
# File 'lib/tasker/orchestration/task_initializer.rb', line 29

delegate :initialize_task!, to: :new

.start_task!Boolean

Start a task's execution

Updates the task status to IN_PROGRESS and fires the appropriate event.

Parameters:

Returns:

  • (Boolean)

    True if the task was started successfully



37
# File 'lib/tasker/orchestration/task_initializer.rb', line 37

delegate :start_task!, to: :new

Instance Method Details

#initialize_task!(task_request, task_handler) ⇒ Tasker::Task

Initialize a new task from a task request

Parameters:

  • task_request (Tasker::Types::TaskRequest)

    The task request

  • task_handler (Object)

    The task handler instance for schema validation

Returns:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/tasker/orchestration/task_initializer.rb', line 45

def initialize_task!(task_request, task_handler)
  task = nil
  context_errors = validate_context_with_handler(task_request.context, task_handler)

  if context_errors.length.positive?
    task = Tasker::Task.from_task_request(task_request)
    context_errors.each do |error|
      task.errors.add(:context, error)
    end

    # Use clean API for task initialization failure
    publish_task_failed(
      task,
      error_message: context_errors.join(', '),
      initialization_failed: true
    )
    return task
  end

  Tasker::Task.transaction do
    task = Tasker::Task.create_with_defaults!(task_request)
    # Get sequence and establish dependencies
    StepSequenceFactory.create_sequence_for_task!(task, task_handler)
  end

  # Use clean API for task initialization success
  publish_task_started(
    task,
    step_count: task.workflow_steps.count
  )

  enqueue_task(task)
  task
end

#start_task!(task) ⇒ Boolean

Start a task's execution

Parameters:

Returns:

  • (Boolean)

    True if the task was started successfully

Raises:



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/tasker/orchestration/task_initializer.rb', line 84

def start_task!(task)
  raise(Tasker::ProceduralError, "task already complete for task #{task.task_id}") if task.complete

  unless task.status == Tasker::Constants::TaskStatuses::PENDING
    raise(Tasker::ProceduralError,
          "task is not pending for task #{task.task_id}, status is #{task.status}")
  end

  task.context = ActiveSupport::HashWithIndifferentAccess.new(task.context)

  # Use state machine to transition task to in_progress
  unless safe_transition_to(task, Tasker::Constants::TaskStatuses::IN_PROGRESS, {
                              initialization_completed: true,
                              step_dependencies_established: task.workflow_steps.count
                            })

    # Use clean API for task start failure
    publish_task_failed(
      task,
      error_message: 'Failed to transition to in_progress',
      initialization_failed: true
    )

    return false
  end

  # Use clean API for task start success
  publish_task_started(task, task_context: task.context)

  true
end