Class: Tasker::Orchestration::WorkflowCoordinator

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::StructuredLogging
Defined in:
lib/tasker/orchestration/workflow_coordinator.rb

Overview

WorkflowCoordinator handles the main task execution loop

This coordinator extracts the proven loop-based execution logic from TaskHandler and provides a strategy pattern for composition with different reenqueuer strategies. This enables proper testing of the complete workflow execution path.

Enhanced with structured logging and performance monitoring for production observability.

Constant Summary

Constants included from Concerns::StructuredLogging

Concerns::StructuredLogging::CORRELATION_ID_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::StructuredLogging

#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id

Constructor Details

#initialize(reenqueuer_strategy: nil) ⇒ WorkflowCoordinator

Initialize coordinator with reenqueuer strategy

Parameters:

  • reenqueuer_strategy (Object) (defaults to: nil)

    Strategy for handling task reenqueuing



21
22
23
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 21

def initialize(reenqueuer_strategy: nil)
  @reenqueuer_strategy = reenqueuer_strategy || default_reenqueuer_strategy
end

Instance Attribute Details

#reenqueuer_strategyObject (readonly)

Returns the value of attribute reenqueuer_strategy.



16
17
18
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 16

def reenqueuer_strategy
  @reenqueuer_strategy
end

Instance Method Details

#execute_workflow(task, task_handler) ⇒ void

This method returns an undefined value.

Execute the complete workflow for a task

This method contains the proven loop logic extracted from TaskHandler#handle and delegates to orchestration components for implementation details.

Enhanced with correlation ID propagation and structured logging.

Parameters:

  • task (Tasker::Task)

    The task to execute

  • task_handler (Object)

    The task handler instance for delegation



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 35

def execute_workflow(task, task_handler)
  # Establish correlation ID for the entire workflow execution
  workflow_correlation_id = correlation_id

  with_correlation_id(workflow_correlation_id) do
    log_orchestration_event('workflow_execution', :started,
                            task_id: task.task_id,
                            task_name: task.name,
                            correlation_id: workflow_correlation_id)

    # Publish workflow started event
    publish_workflow_task_started(task.task_id, correlation_id: workflow_correlation_id)

    # Execute the main workflow loop with performance monitoring
    execute_workflow_with_monitoring(task, task_handler)
  end
rescue StandardError => e
  log_exception(e, context: {
                  task_id: task.task_id,
                  operation: 'workflow_execution',
                  correlation_id: workflow_correlation_id
                })
  raise
end