Class: Tasker::Orchestration::StepExecutor

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::IdempotentStateTransitions, Concerns::StructuredLogging
Defined in:
lib/tasker/orchestration/step_executor.rb

Overview

StepExecutor handles the execution of workflow steps with concurrent processing

This class provides the implementation for step execution while preserving the original concurrent processing capabilities using concurrent-ruby. It fires lifecycle events for observability.

Enhanced with structured logging and performance monitoring for production observability.

Constant Summary

Constants included from Concerns::StructuredLogging

Concerns::StructuredLogging::CORRELATION_ID_KEY

Instance Method Summary collapse

Methods included from Concerns::StructuredLogging

#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::IdempotentStateTransitions

#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to

Instance Method Details

#execute_single_step(task, sequence, step, task_handler) ⇒ Tasker::WorkflowStep?

Execute a single step with state machine transitions and error handling

Enhanced with structured logging and performance monitoring.

Parameters:

Returns:



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/tasker/orchestration/step_executor.rb', line 156

def execute_single_step(task, sequence, step, task_handler)
  step_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  log_step_event(step, :execution_starting,
                 task_id: task.task_id,
                 step_status: step.status,
                 attempt_count: step.attempts)

  # Guard clauses - fail fast if preconditions aren't met
  return nil unless validate_step_preconditions_with_logging(step)
  return nil unless ensure_step_has_initial_state_with_logging(step)
  return nil unless step_ready_for_execution_with_logging?(step)

  # Main execution workflow with monitoring
  result = execute_step_workflow_with_monitoring(task, sequence, step, task_handler, step_start_time)

  step_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start_time

  if result
    log_performance_event('single_step_execution', step_duration,
                          task_id: task.task_id,
                          step_id: step.workflow_step_id,
                          step_name: step.name,
                          result: 'success',
                          attempt_count: step.attempts)
  else
    log_performance_event('single_step_execution', step_duration,
                          task_id: task.task_id,
                          step_id: step.workflow_step_id,
                          step_name: step.name,
                          result: 'failure',
                          attempt_count: step.attempts)
  end

  result
rescue StandardError => e
  step_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start_time

  # Log unexpected errors that occur outside the normal workflow
  step_id = step&.workflow_step_id
  log_exception(e, context: {
                  step_id: step_id,
                  task_id: task&.task_id,
                  operation: 'single_step_execution',
                  duration: step_duration
                })

  Rails.logger.error("StepExecutor: Unexpected error in execute_single_step for step #{step_id}: #{e.message}")
  nil
end

#execute_steps(task, sequence, viable_steps, task_handler) ⇒ Array<Tasker::WorkflowStep>

Execute a collection of viable steps

This method preserves the original concurrent processing logic while adding observability through lifecycle events and structured logging.

Parameters:

Returns:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/tasker/orchestration/step_executor.rb', line 42

def execute_steps(task, sequence, viable_steps, task_handler)
  return [] if viable_steps.empty?

  execution_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  # Always use concurrent processing - sequential mode has been deprecated
  processing_mode = 'concurrent'

  log_orchestration_event('step_batch_execution', :started,
                          task_id: task.task_id,
                          step_count: viable_steps.size,
                          processing_mode: processing_mode,
                          step_names: viable_steps.map(&:name))

  # Fire observability event through orchestrator
  publish_steps_execution_started(
    task,
    step_count: viable_steps.size,
    processing_mode: processing_mode
  )

  # Always use concurrent processing with dynamic concurrency optimization
  processed_steps = execute_steps_concurrently_with_monitoring(task, sequence, viable_steps, task_handler)

  execution_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - execution_start_time
  successful_count = processed_steps.count do |s|
    s&.status == Tasker::Constants::WorkflowStepStatuses::COMPLETE
  end

  # Log performance metrics
  log_performance_event('step_batch_execution', execution_duration,
                        task_id: task.task_id,
                        step_count: viable_steps.size,
                        processed_count: processed_steps.size,
                        successful_count: successful_count,
                        failure_count: processed_steps.size - successful_count,
                        processing_mode: processing_mode)

  # Fire completion event through orchestrator
  publish_steps_execution_completed(
    task,
    processed_count: processed_steps.size,
    successful_count: successful_count
  )

  log_orchestration_event('step_batch_execution', :completed,
                          task_id: task.task_id,
                          processed_count: processed_steps.size,
                          successful_count: successful_count,
                          failure_count: processed_steps.size - successful_count,
                          duration_ms: (execution_duration * 1000).round(2))

  processed_steps.compact
end

#execution_configObject

Configuration-driven execution settings These delegate to Tasker::Configuration.configuration.execution for configurable values while maintaining architectural constants for Ruby-specific optimizations



28
29
30
# File 'lib/tasker/orchestration/step_executor.rb', line 28

def execution_config
  @execution_config ||= Tasker::Configuration.configuration.execution
end

#handle_viable_steps_discovered(event) ⇒ Object

Handle viable steps discovered event

Convenience method for event-driven workflows that takes an event payload and executes the discovered steps.

Parameters:

  • event (Hash)

    Event payload with task_id, step_ids, and processing_mode



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/tasker/orchestration/step_executor.rb', line 126

def handle_viable_steps_discovered(event)
  task_id = event[:task_id]
  step_ids = event[:step_ids] || []

  return [] if step_ids.empty?

  with_correlation_id(event[:correlation_id]) do
    log_orchestration_event('event_driven_execution', :started,
                            task_id: task_id,
                            step_ids: step_ids,
                            trigger: 'viable_steps_discovered')

    task = Tasker::Task.find(task_id)
    task_handler = Tasker::HandlerFactory.instance.get(task.name)
    sequence = Tasker::Orchestration::StepSequenceFactory.get_sequence(task, task_handler)
    viable_steps = task.workflow_steps.where(workflow_step_id: step_ids)

    execute_steps(task, sequence, viable_steps, task_handler)
  end
end

#max_concurrent_stepsInteger

Calculate optimal concurrency based on system health and resources

This method dynamically determines the maximum number of steps that can be executed concurrently based on current system load, database connections, and other health metrics. Now enhanced with ConnectionPoolIntelligence for Rails-aware connection management.

Returns:

  • (Integer)

    Optimal number of concurrent steps (between configured min and max)



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/tasker/orchestration/step_executor.rb', line 105

def max_concurrent_steps
  # Return cached value if still valid
  cache_duration = execution_config.concurrency_cache_duration.seconds
  if @max_concurrent_steps && @concurrency_calculated_at &&
     (Time.current - @concurrency_calculated_at) < cache_duration
    return @max_concurrent_steps
  end

  # Calculate new concurrency level using enhanced intelligence
  @max_concurrent_steps = calculate_optimal_concurrency
  @concurrency_calculated_at = Time.current

  @max_concurrent_steps
end