Class: Aidp::Execute::AsyncWorkLoopRunner
- Inherits:
-
Object
- Object
- Aidp::Execute::AsyncWorkLoopRunner
- Includes:
- RescueLogging
- Defined in:
- lib/aidp/execute/async_work_loop_runner.rb
Overview
Asynchronous wrapper around WorkLoopRunner Runs work loop in a separate thread while maintaining REPL responsiveness
Responsibilities:
-
Execute work loop in background thread
-
Monitor execution state (pause, resume, cancel)
-
Merge queued instructions at iteration boundaries
-
Stream output to main thread for display
-
Handle graceful cancellation with checkpoint save
Instance Attribute Summary collapse
-
#instruction_queue ⇒ Object
readonly
Returns the value of attribute instruction_queue.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#work_thread ⇒ Object
readonly
Returns the value of attribute work_thread.
Instance Method Summary collapse
-
#cancel(save_checkpoint: true) ⇒ Object
Cancel execution gracefully.
-
#drain_output ⇒ Object
Get streaming output.
-
#enqueue_instruction(content, type: :user_input, priority: :normal) ⇒ Object
Add instruction to queue (will be merged at next iteration).
-
#execute_step_async(step_name, step_spec, context = {}) ⇒ Object
Start async work loop execution Returns immediately, work continues in background thread.
-
#initialize(project_dir, provider_manager, config, options = {}) ⇒ AsyncWorkLoopRunner
constructor
A new instance of AsyncWorkLoopRunner.
-
#pause ⇒ Object
Pause execution.
-
#resume ⇒ Object
Resume execution.
-
#running? ⇒ Boolean
Check if work loop is running.
-
#status ⇒ Object
Get current status.
-
#wait ⇒ Object
Wait for work loop to complete (blocking).
Methods included from RescueLogging
__log_rescue_impl, log_rescue, #log_rescue
Constructor Details
#initialize(project_dir, provider_manager, config, options = {}) ⇒ AsyncWorkLoopRunner
Returns a new instance of AsyncWorkLoopRunner.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 24 def initialize(project_dir, provider_manager, config, = {}) @project_dir = project_dir @provider_manager = provider_manager @config = config @options = @cancel_timeout = [:cancel_timeout] || 5 # seconds to wait for graceful shutdown @sync_runner_class = [:sync_runner_class] || WorkLoopRunner @state = WorkLoopState.new @instruction_queue = InstructionQueue.new @work_thread = nil @sync_runner = nil end |
Instance Attribute Details
#instruction_queue ⇒ Object (readonly)
Returns the value of attribute instruction_queue.
22 23 24 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22 def instruction_queue @instruction_queue end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
22 23 24 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22 def state @state end |
#work_thread ⇒ Object (readonly)
Returns the value of attribute work_thread.
22 23 24 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22 def work_thread @work_thread end |
Instance Method Details
#cancel(save_checkpoint: true) ⇒ Object
Cancel execution gracefully
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 90 def cancel(save_checkpoint: true) return if @state.cancelled? || @state.completed? @state.cancel! @state.append_output("Cancellation requested, waiting for safe stopping point...", type: :warning) # Wait for thread to notice cancellation @work_thread&.join(@cancel_timeout) if save_checkpoint && @sync_runner @state.append_output("Saving checkpoint before exit...", type: :info) save_cancellation_checkpoint end {status: "cancelled", iteration: @state.iteration} end |
#drain_output ⇒ Object
Get streaming output
120 121 122 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 120 def drain_output @state.drain_output end |
#enqueue_instruction(content, type: :user_input, priority: :normal) ⇒ Object
Add instruction to queue (will be merged at next iteration)
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 108 def enqueue_instruction(content, type: :user_input, priority: :normal) @instruction_queue.enqueue(content, type: type, priority: priority) @state.enqueue_instruction(content) { status: "enqueued", queued_count: @instruction_queue.count, message: "Instruction will be merged in next iteration" } end |
#execute_step_async(step_name, step_spec, context = {}) ⇒ Object
Start async work loop execution Returns immediately, work continues in background thread
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 39 def execute_step_async(step_name, step_spec, context = {}) raise WorkLoopState::StateError, "Work loop already running" unless @state.idle? @state.start! @step_name = step_name @step_spec = step_spec @context = context @work_thread = Thread.new do run_async_loop rescue => e log_rescue(e, component: "async_work_loop_runner", action: "thread_execution", fallback: "error_state", step: @step_name) @state.error!(e) @state.append_output("Work loop error: #{e.}", type: :error) ensure @work_thread = nil end # Allow thread to start Thread.pass {status: "started", state: @state.summary} end |
#pause ⇒ Object
Pause execution
76 77 78 79 80 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 76 def pause return unless running? @state.pause! {status: "paused", iteration: @state.iteration} end |
#resume ⇒ Object
Resume execution
83 84 85 86 87 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 83 def resume return unless @state.paused? @state.resume! {status: "resumed", iteration: @state.iteration} end |
#running? ⇒ Boolean
Check if work loop is running
71 72 73 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 71 def running? @work_thread&.alive? && @state.running? end |
#status ⇒ Object
Get current status
125 126 127 128 129 130 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 125 def status summary = @state.summary summary[:queued_instructions] = @instruction_queue.summary summary[:thread_alive] = @work_thread&.alive? || false summary end |
#wait ⇒ Object
Wait for work loop to complete (blocking)
64 65 66 67 68 |
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 64 def wait return unless @work_thread @work_thread.join build_final_result end |