Class: RubyReactor::Executor::StepExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/executor/step_executor.rb

Instance Method Summary collapse

Constructor Details

#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor

Returns a new instance of StepExecutor.



6
7
8
9
10
11
12
13
# File 'lib/ruby_reactor/executor/step_executor.rb', line 6

def initialize(context:, dependency_graph:, reactor_class:, managers:)
  @context = context
  @dependency_graph = dependency_graph
  @reactor_class = reactor_class
  @retry_manager = managers[:retry_manager]
  @result_handler = managers[:result_handler]
  @compensation_manager = managers[:compensation_manager]
end

Instance Method Details

#execute_all_stepsObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/ruby_reactor/executor/step_executor.rb', line 15

def execute_all_steps
  until @dependency_graph.all_completed?
    ready_steps = @dependency_graph.ready_steps

    if ready_steps.empty?
      raise Error::DependencyError.new(
        "No ready steps available but execution not complete",
        context: @context
      )
    end

    # Execute steps sequentially
    ready_steps.each do |step_config|
      result = execute_step(step_config)

      # If step execution was handed off to async, return the async result
      return result if result.is_a?(RubyReactor::AsyncResult)

      # If a step returns RetryQueuedResult, we need to stop and return it
      return result if result.is_a?(RetryQueuedResult)

      # If a step returns Failure, we need to stop execution and return it
      return result if result.is_a?(RubyReactor::Failure)

      # If result is nil, it means async was executed inline (test mode), continue
      next if result.nil?
    end
  end

  # Return the final result
  @result_handler.final_result(@reactor_class)
end

#execute_step(step_config) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/ruby_reactor/executor/step_executor.rb', line 48

def execute_step(step_config)
  # If we're already in inline async execution mode (inside Worker),
  # treat async steps as sync to avoid infinite recursion

  if step_config.async? && !@context.inline_async_execution
    handle_async_step(step_config)
  else
    execute_step_with_retry(step_config)
  end
end

#execute_step_sync(step_config) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/ruby_reactor/executor/step_executor.rb', line 137

def execute_step_sync(step_config)
  @context.with_step(step_config.name) do
    # Check conditions and guards
    unless step_config.should_run?(@context)
      @dependency_graph.complete_step(step_config.name)
      return RubyReactor.Success(nil)
    end

    # Resolve arguments
    resolved_arguments = resolve_arguments(step_config)

    # Validate arguments if validator is defined
    validate_step_arguments(step_config, resolved_arguments)

    # Execute the step
    result = run_step_implementation(step_config, resolved_arguments)

    # Handle the result
    @result_handler.handle_step_result(step_config, result, resolved_arguments)
  end
end

#execute_step_sync_without_result_handling(step_config) ⇒ Object

Execute step without handling the result (used during retries)



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/ruby_reactor/executor/step_executor.rb', line 160

def execute_step_sync_without_result_handling(step_config)
  @context.with_step(step_config.name) do
    # Check conditions and guards
    unless step_config.should_run?(@context)
      @dependency_graph.complete_step(step_config.name)
      return RubyReactor.Success(nil)
    end

    # Resolve arguments
    resolved_arguments = resolve_arguments(step_config)

    yield resolved_arguments if block_given?

    # Validate arguments if validator is defined
    validate_step_arguments(step_config, resolved_arguments)

    # Execute the step
    run_step_implementation(step_config, resolved_arguments)
  end
end

#execute_step_with_retry(step_config) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/ruby_reactor/executor/step_executor.rb', line 105

def execute_step_with_retry(step_config)
  result = @retry_manager.execute_with_retry(step_config, @reactor_class) do
    safe_execute_step_sync(step_config)
  end

  unless result.is_a?(RetryQueuedResult) || result.is_a?(RubyReactor::AsyncResult)
    resolved_arguments = resolve_arguments(step_config)
    @result_handler.handle_step_result(step_config, result, resolved_arguments)
  end

  result
end

#merge_executor_state(other_executor) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ruby_reactor/executor/step_executor.rb', line 59

def merge_executor_state(other_executor)
  # Merge the state from the async-executed executor back into ours
  # We need to update our context IN PLACE, not replace the reference,
  # because the Executor also holds a reference to the same context object

  # Update intermediate results
  other_executor.context.intermediate_results.each do |step_name, value|
    @context.set_result(step_name, value)
  end

  # Append execution trace from the async execution
  # The Worker's execution will have ALL steps including ones we already executed,
  # but we only want to add the NEW entries (from current_step onwards)
  current_trace_length = @context.execution_trace.length
  new_trace_entries = other_executor.context.execution_trace[current_trace_length..] || []

  @context.execution_trace.concat(new_trace_entries)

  # Update retry context
  @context.retry_context = other_executor.context.retry_context

  # Clear current_step since we've completed it
  @context.current_step = nil

  # Update our dependency graph to reflect completed steps
  other_executor.context.intermediate_results.each_key do |step_name|
    @dependency_graph.complete_step(step_name)
  end

  # Also mark the current_step as completed if it exists (for failed steps that don't have results)
  @dependency_graph.complete_step(other_executor.context.current_step) if other_executor.context.current_step

  # Merge any undo stack items
  other_executor.undo_stack.each do |item|
    # Avoid duplicates by checking if this step is already in the undo stack
    unless @compensation_manager.undo_stack.any? { |existing| existing[:step].name == item[:step].name }
      @compensation_manager.add_to_undo_stack(item)
    end
  end

  # Merge undo trace from the other executor
  other_executor.undo_trace.each do |trace_entry|
    @compensation_manager.undo_trace << trace_entry
  end
end

#safe_execute_step_sync(step_config) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/ruby_reactor/executor/step_executor.rb', line 118

def safe_execute_step_sync(step_config)
  resolved_arguments = {}
  execute_step_sync_without_result_handling(step_config) do |args|
    resolved_arguments = args
  end
rescue StandardError => e
  # Identify redacted inputs
  redact_inputs = @reactor_class.inputs.select { |_, config| config[:redact] }.keys

  RubyReactor::Failure(
    e,
    step_name: step_config.name,
    inputs: @context.inputs,
    redact_inputs: redact_inputs,
    reactor_name: @reactor_class.name,
    step_arguments: resolved_arguments
  )
end