Class: Aidp::Execute::AsyncWorkLoopRunner

Inherits:
Object
  • Object
show all
Includes:
RescueLogging
Defined in:
lib/aidp/execute/async_work_loop_runner.rb

Overview

Asynchronous wrapper around WorkLoopRunner Runs work loop in a separate thread while maintaining REPL responsiveness

Responsibilities:

  • Execute work loop in background thread

  • Monitor execution state (pause, resume, cancel)

  • Merge queued instructions at iteration boundaries

  • Stream output to main thread for display

  • Handle graceful cancellation with checkpoint save

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RescueLogging

__log_rescue_impl, log_rescue, #log_rescue

Constructor Details

#initialize(project_dir, provider_manager, config, options = {}) ⇒ AsyncWorkLoopRunner

Returns a new instance of AsyncWorkLoopRunner.



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 27

def initialize(project_dir, provider_manager, config, options = {})
  @project_dir = project_dir
  @provider_manager = provider_manager
  @config = config
  @options = options
  @cancel_timeout = options[:cancel_timeout] || 5 # seconds to wait for graceful shutdown
  @sync_runner_class = options[:sync_runner_class] || WorkLoopRunner
  @state = WorkLoopState.new
  @instruction_queue = InstructionQueue.new
  @work_thread = nil
  @sync_runner = nil
end

Instance Attribute Details

#instruction_queueObject (readonly)

Returns the value of attribute instruction_queue.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def instruction_queue
  @instruction_queue
end

#stateObject (readonly)

Returns the value of attribute state.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def state
  @state
end

#sync_runnerObject

Expose sync_runner for testability



25
26
27
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 25

def sync_runner
  @sync_runner
end

#work_threadObject (readonly)

Returns the value of attribute work_thread.



22
23
24
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 22

def work_thread
  @work_thread
end

Instance Method Details

#cancel(save_checkpoint: true) ⇒ Object

Cancel execution gracefully



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 93

def cancel(save_checkpoint: true)
  return if @state.cancelled? || @state.completed?

  @state.cancel!
  @state.append_output("Cancellation requested, waiting for safe stopping point...", type: :warning)

  # Wait for thread to notice cancellation
  @work_thread&.join(@cancel_timeout)

  if save_checkpoint && @sync_runner
    @state.append_output("Saving checkpoint before exit...", type: :info)
    save_cancellation_checkpoint
  end

  {status: "cancelled", iteration: @state.iteration}
end

#drain_outputObject

Get streaming output



123
124
125
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 123

def drain_output
  @state.drain_output
end

#enqueue_instruction(content, type: :user_input, priority: :normal) ⇒ Object

Add instruction to queue (will be merged at next iteration)



111
112
113
114
115
116
117
118
119
120
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 111

def enqueue_instruction(content, type: :user_input, priority: :normal)
  @instruction_queue.enqueue(content, type: type, priority: priority)
  @state.enqueue_instruction(content)

  {
    status: "enqueued",
    queued_count: @instruction_queue.count,
    message: "Instruction will be merged in next iteration"
  }
end

#execute_step_async(step_name, step_spec, context = {}) ⇒ Object

Start async work loop execution Returns immediately, work continues in background thread



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 42

def execute_step_async(step_name, step_spec, context = {})
  raise WorkLoopState::StateError, "Work loop already running" unless @state.idle?

  @state.start!
  @step_name = step_name
  @step_spec = step_spec
  @context = context

  @work_thread = Thread.new do
    run_async_loop
  rescue => e
    log_rescue(e, component: "async_work_loop_runner", action: "thread_execution", fallback: "error_state", step: @step_name)
    @state.error!(e)
    @state.append_output("Work loop error: #{e.message}", type: :error)
  ensure
    @work_thread = nil
  end

  # Allow thread to start
  Thread.pass

  {status: "started", state: @state.summary}
end

#pauseObject

Pause execution



79
80
81
82
83
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 79

def pause
  return unless running?
  @state.pause!
  {status: "paused", iteration: @state.iteration}
end

#resumeObject

Resume execution



86
87
88
89
90
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 86

def resume
  return unless @state.paused?
  @state.resume!
  {status: "resumed", iteration: @state.iteration}
end

#running?Boolean

Check if work loop is running

Returns:

  • (Boolean)


74
75
76
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 74

def running?
  @work_thread&.alive? && @state.running?
end

#statusObject

Get current status



128
129
130
131
132
133
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 128

def status
  summary = @state.summary
  summary[:queued_instructions] = @instruction_queue.summary
  summary[:thread_alive] = @work_thread&.alive? || false
  summary
end

#waitObject

Wait for work loop to complete (blocking)



67
68
69
70
71
# File 'lib/aidp/execute/async_work_loop_runner.rb', line 67

def wait
  return unless @work_thread
  @work_thread.join
  build_final_result
end