Class: Tasker::Orchestration::StepExecutor
- Inherits:
-
Object
- Object
- Tasker::Orchestration::StepExecutor
- Includes:
- Concerns::EventPublisher, Concerns::IdempotentStateTransitions, Concerns::StructuredLogging
- Defined in:
- lib/tasker/orchestration/step_executor.rb
Overview
StepExecutor handles the execution of workflow steps with concurrent processing
This class provides the implementation for step execution while preserving the original concurrent processing capabilities using concurrent-ruby. It fires lifecycle events for observability.
Enhanced with structured logging and performance monitoring for production observability.
Constant Summary
Constants included from Concerns::StructuredLogging
Concerns::StructuredLogging::CORRELATION_ID_KEY
Instance Method Summary collapse
-
#execute_single_step(task, sequence, step, task_handler) ⇒ Tasker::WorkflowStep?
Execute a single step with state machine transitions and error handling.
-
#execute_steps(task, sequence, viable_steps, task_handler) ⇒ Array<Tasker::WorkflowStep>
Execute a collection of viable steps.
-
#execution_config ⇒ Object
Configuration-driven execution settings These delegate to Tasker::Configuration.configuration.execution for configurable values while maintaining architectural constants for Ruby-specific optimizations.
-
#handle_viable_steps_discovered(event) ⇒ Object
Handle viable steps discovered event.
-
#max_concurrent_steps ⇒ Integer
Calculate optimal concurrency based on system health and resources.
Methods included from Concerns::StructuredLogging
#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id
Methods included from Concerns::EventPublisher
#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started
Methods included from Concerns::IdempotentStateTransitions
#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to
Instance Method Details
#execute_single_step(task, sequence, step, task_handler) ⇒ Tasker::WorkflowStep?
Execute a single step with state machine transitions and error handling
Enhanced with structured logging and performance monitoring.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/tasker/orchestration/step_executor.rb', line 156 def execute_single_step(task, sequence, step, task_handler) step_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) log_step_event(step, :execution_starting, task_id: task.task_id, step_status: step.status, attempt_count: step.attempts) # Guard clauses - fail fast if preconditions aren't met return nil unless validate_step_preconditions_with_logging(step) return nil unless ensure_step_has_initial_state_with_logging(step) return nil unless step_ready_for_execution_with_logging?(step) # Main execution workflow with monitoring result = execute_step_workflow_with_monitoring(task, sequence, step, task_handler, step_start_time) step_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start_time if result log_performance_event('single_step_execution', step_duration, task_id: task.task_id, step_id: step.workflow_step_id, step_name: step.name, result: 'success', attempt_count: step.attempts) else log_performance_event('single_step_execution', step_duration, task_id: task.task_id, step_id: step.workflow_step_id, step_name: step.name, result: 'failure', attempt_count: step.attempts) end result rescue StandardError => e step_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start_time # Log unexpected errors that occur outside the normal workflow step_id = step&.workflow_step_id log_exception(e, context: { step_id: step_id, task_id: task&.task_id, operation: 'single_step_execution', duration: step_duration }) Rails.logger.error("StepExecutor: Unexpected error in execute_single_step for step #{step_id}: #{e.}") nil end |
#execute_steps(task, sequence, viable_steps, task_handler) ⇒ Array<Tasker::WorkflowStep>
Execute a collection of viable steps
This method preserves the original concurrent processing logic while adding observability through lifecycle events and structured logging.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/tasker/orchestration/step_executor.rb', line 42 def execute_steps(task, sequence, viable_steps, task_handler) return [] if viable_steps.empty? execution_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Always use concurrent processing - sequential mode has been deprecated processing_mode = 'concurrent' log_orchestration_event('step_batch_execution', :started, task_id: task.task_id, step_count: viable_steps.size, processing_mode: processing_mode, step_names: viable_steps.map(&:name)) # Fire observability event through orchestrator publish_steps_execution_started( task, step_count: viable_steps.size, processing_mode: processing_mode ) # Always use concurrent processing with dynamic concurrency optimization processed_steps = execute_steps_concurrently_with_monitoring(task, sequence, viable_steps, task_handler) execution_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - execution_start_time successful_count = processed_steps.count do |s| s&.status == Tasker::Constants::WorkflowStepStatuses::COMPLETE end # Log performance metrics log_performance_event('step_batch_execution', execution_duration, task_id: task.task_id, step_count: viable_steps.size, processed_count: processed_steps.size, successful_count: successful_count, failure_count: processed_steps.size - successful_count, processing_mode: processing_mode) # Fire completion event through orchestrator publish_steps_execution_completed( task, processed_count: processed_steps.size, successful_count: successful_count ) log_orchestration_event('step_batch_execution', :completed, task_id: task.task_id, processed_count: processed_steps.size, successful_count: successful_count, failure_count: processed_steps.size - successful_count, duration_ms: (execution_duration * 1000).round(2)) processed_steps.compact end |
#execution_config ⇒ Object
Configuration-driven execution settings These delegate to Tasker::Configuration.configuration.execution for configurable values while maintaining architectural constants for Ruby-specific optimizations
28 29 30 |
# File 'lib/tasker/orchestration/step_executor.rb', line 28 def execution_config @execution_config ||= Tasker::Configuration.configuration.execution end |
#handle_viable_steps_discovered(event) ⇒ Object
Handle viable steps discovered event
Convenience method for event-driven workflows that takes an event payload and executes the discovered steps.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/tasker/orchestration/step_executor.rb', line 126 def handle_viable_steps_discovered(event) task_id = event[:task_id] step_ids = event[:step_ids] || [] return [] if step_ids.empty? with_correlation_id(event[:correlation_id]) do log_orchestration_event('event_driven_execution', :started, task_id: task_id, step_ids: step_ids, trigger: 'viable_steps_discovered') task = Tasker::Task.find(task_id) task_handler = Tasker::HandlerFactory.instance.get(task.name) sequence = Tasker::Orchestration::StepSequenceFactory.get_sequence(task, task_handler) viable_steps = task.workflow_steps.where(workflow_step_id: step_ids) execute_steps(task, sequence, viable_steps, task_handler) end end |
#max_concurrent_steps ⇒ Integer
Calculate optimal concurrency based on system health and resources
This method dynamically determines the maximum number of steps that can be executed concurrently based on current system load, database connections, and other health metrics. Now enhanced with ConnectionPoolIntelligence for Rails-aware connection management.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/tasker/orchestration/step_executor.rb', line 105 def max_concurrent_steps # Return cached value if still valid cache_duration = execution_config.concurrency_cache_duration.seconds if @max_concurrent_steps && @concurrency_calculated_at && (Time.current - @concurrency_calculated_at) < cache_duration return @max_concurrent_steps end # Calculate new concurrency level using enhanced intelligence @max_concurrent_steps = calculate_optimal_concurrency @concurrency_calculated_at = Time.current @max_concurrent_steps end |