Class: Tasker::Orchestration::WorkflowCoordinator
- Inherits:
-
Object
- Object
- Tasker::Orchestration::WorkflowCoordinator
- Defined in:
- lib/tasker/orchestration/workflow_coordinator.rb
Overview
WorkflowCoordinator handles the main task execution loop
This coordinator extracts the proven loop-based execution logic from TaskHandler and provides a strategy pattern for composition with different reenqueuer strategies. This enables proper testing of the complete workflow execution path.
Enhanced with structured logging and performance monitoring for production observability.
Constant Summary
Constants included from Concerns::StructuredLogging
Concerns::StructuredLogging::CORRELATION_ID_KEY
Instance Attribute Summary collapse
-
#reenqueuer_strategy ⇒ Object
readonly
Returns the value of attribute reenqueuer_strategy.
Instance Method Summary collapse
-
#execute_workflow(task, task_handler) ⇒ void
Execute the complete workflow for a task.
-
#initialize(reenqueuer_strategy: nil) ⇒ WorkflowCoordinator
constructor
Initialize coordinator with reenqueuer strategy.
Methods included from Concerns::EventPublisher
#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started
Methods included from Concerns::StructuredLogging
#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id
Constructor Details
#initialize(reenqueuer_strategy: nil) ⇒ WorkflowCoordinator
Initialize coordinator with reenqueuer strategy
21 22 23 |
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 21 def initialize(reenqueuer_strategy: nil) @reenqueuer_strategy = reenqueuer_strategy || default_reenqueuer_strategy end |
Instance Attribute Details
#reenqueuer_strategy ⇒ Object (readonly)
Returns the value of attribute reenqueuer_strategy.
16 17 18 |
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 16 def reenqueuer_strategy @reenqueuer_strategy end |
Instance Method Details
#execute_workflow(task, task_handler) ⇒ void
This method returns an undefined value.
Execute the complete workflow for a task
This method contains the proven loop logic extracted from TaskHandler#handle and delegates to orchestration components for implementation details.
Enhanced with correlation ID propagation and structured logging.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/tasker/orchestration/workflow_coordinator.rb', line 35 def execute_workflow(task, task_handler) # Establish correlation ID for the entire workflow execution workflow_correlation_id = correlation_id with_correlation_id(workflow_correlation_id) do log_orchestration_event('workflow_execution', :started, task_id: task.task_id, task_name: task.name, correlation_id: workflow_correlation_id) # Publish workflow started event publish_workflow_task_started(task.task_id, correlation_id: workflow_correlation_id) # Execute the main workflow loop with performance monitoring execute_workflow_with_monitoring(task, task_handler) end rescue StandardError => e log_exception(e, context: { task_id: task.task_id, operation: 'workflow_execution', correlation_id: workflow_correlation_id }) raise end |