Class: Aidp::Execute::AsyncWorkLoopRunner

Inherits:
Object
  • Object
show all
Includes:
RescueLogging
Defined in:
lib/aidp/execute/async_work_loop_runner.rb

Overview

Asynchronous wrapper around WorkLoopRunner Runs work loop in a separate thread while maintaining REPL responsiveness

Responsibilities:

  • Execute work loop in background thread

  • Monitor execution state (pause, resume, cancel)

  • Merge queued instructions at iteration boundaries

  • Stream output to main thread for display

  • Handle graceful cancellation with checkpoint save

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RescueLogging

__log_rescue_impl, log_rescue, #log_rescue

Constructor Details

#initialize(project_dir, provider_manager, config, options = {}) ⇒ AsyncWorkLoopRunner

Returns a new instance of AsyncWorkLoopRunner.



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 24

def initialize(project_dir, provider_manager, config, options = {})
  @project_dir = project_dir
  @provider_manager = provider_manager
  @config = config
  @options = options
  @cancel_timeout = options[:cancel_timeout] || 5 # seconds to wait for graceful shutdown
  @sync_runner_class = options[:sync_runner_class] || WorkLoopRunner
  @state = WorkLoopState.new
  @instruction_queue = InstructionQueue.new
  @work_thread = nil
  @sync_runner = nil
end

Instance Attribute Details

#instruction_queueObject (readonly)

Returns the value of attribute instruction_queue.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def instruction_queue
  @instruction_queue
end

#stateObject (readonly)

Returns the value of attribute state.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def state
  @state
end

#work_threadObject (readonly)

Returns the value of attribute work_thread.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def work_thread
  @work_thread
end

Instance Method Details

#cancel(save_checkpoint: true) ⇒ Object

Cancel execution gracefully



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 90

def cancel(save_checkpoint: true)
  return if @state.cancelled? || @state.completed?

  @state.cancel!
  @state.append_output("Cancellation requested, waiting for safe stopping point...", type: :warning)

  # Wait for thread to notice cancellation
  @work_thread&.join(@cancel_timeout)

  if save_checkpoint && @sync_runner
    @state.append_output("Saving checkpoint before exit...", type: :info)
    save_cancellation_checkpoint
  end

  {status: "cancelled", iteration: @state.iteration}
end

#drain_outputObject

Get streaming output



120
121
122
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 120

def drain_output
  @state.drain_output
end

#enqueue_instruction(content, type: :user_input, priority: :normal) ⇒ Object

Add instruction to queue (will be merged at next iteration)



108
109
110
111
112
113
114
115
116
117
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 108

def enqueue_instruction(content, type: :user_input, priority: :normal)
  @instruction_queue.enqueue(content, type: type, priority: priority)
  @state.enqueue_instruction(content)

  {
    status: "enqueued",
    queued_count: @instruction_queue.count,
    message: "Instruction will be merged in next iteration"
  }
end

#execute_step_async(step_name, step_spec, context = {}) ⇒ Object

Start async work loop execution Returns immediately, work continues in background thread



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 39

def execute_step_async(step_name, step_spec, context = {})
  raise WorkLoopState::StateError, "Work loop already running" unless @state.idle?

  @state.start!
  @step_name = step_name
  @step_spec = step_spec
  @context = context

  @work_thread = Thread.new do
    run_async_loop
  rescue => e
    log_rescue(e, component: "async_work_loop_runner", action: "thread_execution", fallback: "error_state", step: @step_name)
    @state.error!(e)
    @state.append_output("Work loop error: #{e.message}", type: :error)
  ensure
    @work_thread = nil
  end

  # Allow thread to start
  Thread.pass

  {status: "started", state: @state.summary}
end

#pauseObject

Pause execution



76
77
78
79
80
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 76

def pause
  return unless running?
  @state.pause!
  {status: "paused", iteration: @state.iteration}
end

#resumeObject

Resume execution



83
84
85
86
87
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 83

def resume
  return unless @state.paused?
  @state.resume!
  {status: "resumed", iteration: @state.iteration}
end

#running?Boolean

Check if work loop is running

Returns:

  • (Boolean)


71
72
73
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 71

def running?
  @work_thread&.alive? && @state.running?
end

#statusObject

Get current status



125
126
127
128
129
130
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 125

def status
  summary = @state.summary
  summary[:queued_instructions] = @instruction_queue.summary
  summary[:thread_alive] = @work_thread&.alive? || false
  summary
end

#waitObject

Wait for work loop to complete (blocking)



64
65
66
67
68
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 64

def wait
  return unless @work_thread
  @work_thread.join
  build_final_result
end