Class: Aidp::WorkstreamExecutor

Inherits:
Object
  • Object
show all
Includes:
MessageDisplay
Defined in:
lib/aidp/workstream_executor.rb

Overview

Executes multiple workstreams in parallel using concurrent-ruby. Provides true parallel execution with process isolation and status tracking.

Defined Under Namespace

Classes: WorkstreamResult

Constant Summary

Constants included from MessageDisplay

MessageDisplay::COLOR_MAP

Instance Method Summary collapse

Methods included from MessageDisplay

#display_message, included, #message_display_prompt

Constructor Details

#initialize(project_dir: Dir.pwd, max_concurrent: 3, runner_factory: nil) ⇒ WorkstreamExecutor

Returns a new instance of WorkstreamExecutor.

Parameters:

  • project_dir (String) (defaults to: Dir.pwd)

    root directory of the project

  • max_concurrent (Integer) (defaults to: 3)

    maximum number of concurrent workstreams (thread pool size)

  • runner_factory (Proc) (defaults to: nil)

    factory that builds a harness runner. Signature: (path, mode, options) => object responding to #run



31
32
33
34
35
36
37
38
39
# File 'lib/aidp/workstream_executor.rb', line 31

def initialize(project_dir: Dir.pwd, max_concurrent: 3, runner_factory: nil)
  @project_dir = project_dir
  @max_concurrent = max_concurrent
  @results = Concurrent::Hash.new
  @start_times = Concurrent::Hash.new
  @runner_factory = runner_factory || lambda do |path, mode, options|
    Aidp::Harness::Runner.new(path, mode, options)
  end
end

Instance Method Details

#execute_all(options = {}) ⇒ Array<WorkstreamResult>

Execute all active workstreams in parallel

Parameters:

  • options (Hash) (defaults to: {})

    Execution options (same as execute_parallel)

Returns:



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/aidp/workstream_executor.rb', line 79

def execute_all(options = {})
  workstreams = Aidp::Worktree.list(project_dir: @project_dir)
  active_slugs = workstreams.select { |ws| ws[:active] }.map { |ws| ws[:slug] }

  if active_slugs.empty?
    display_message("⚠️  No active workstreams found", type: :warn)
    return []
  end

  execute_parallel(active_slugs, options)
end

#execute_parallel(slugs, options = {}) ⇒ Array<WorkstreamResult>

Execute multiple workstreams in parallel

Parameters:

  • slugs (Array<String>)

    Workstream slugs to execute

  • options (Hash) (defaults to: {})

    Execution options

Options Hash (options):

  • :selected_steps (Array<String>)

    Steps to execute

  • :workflow_type (Symbol)

    Workflow type (:execute, :analyze, etc.)

  • :user_input (Hash)

    User input for harness

Returns:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/aidp/workstream_executor.rb', line 49

def execute_parallel(slugs, options = {})
  validate_workstreams!(slugs)

  display_message("🚀 Starting parallel execution of #{slugs.size} workstreams (max #{@max_concurrent} concurrent)", type: :info)

  # Create thread pool with max concurrent limit
  pool = Concurrent::FixedThreadPool.new(@max_concurrent)

  # Create futures for each workstream
  futures = slugs.map do |slug|
    Concurrent::Future.execute(executor: pool) do
      execute_workstream(slug, options)
    end
  end

  # Wait for all futures to complete
  results = futures.map(&:value)

  # Shutdown pool gracefully
  pool.shutdown
  pool.wait_for_termination(30)

  display_execution_summary(results)
  results
end

#execute_workstream(slug, options = {}) ⇒ WorkstreamResult

Execute a single workstream (used by futures in parallel execution)

Parameters:

  • slug (String)

    Workstream slug

  • options (Hash) (defaults to: {})

    Execution options

Returns:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/aidp/workstream_executor.rb', line 96

def execute_workstream(slug, options = {})
  started_at = Time.now
  @start_times[slug] = started_at

  workstream = Aidp::Worktree.info(slug: slug, project_dir: @project_dir)
  unless workstream
    return WorkstreamResult.new(
      slug: slug,
      status: "error",
      exit_code: 1,
      started_at: started_at,
      completed_at: Time.now,
      duration: 0,
      error: "Workstream not found"
    )
  end

  display_message("▶️  [#{slug}] Starting execution in #{workstream[:path]}", type: :info)

  # Update workstream state to active
  Aidp::WorkstreamState.update(
    slug: slug,
    project_dir: @project_dir,
    status: "active",
    started_at: started_at.utc.iso8601
  )

  # Execute in forked process for true isolation
  pid = fork do
    # Change to workstream directory
    Dir.chdir(workstream[:path])

    # Execute harness
    runner = @runner_factory.call(
      workstream[:path],
      options[:mode] || :execute,
      options
    )

    result = runner.run

    # Update state on completion
    exit_code = (result[:status] == "completed") ? 0 : 1
    final_status = (result[:status] == "completed") ? "completed" : "failed"

    Aidp::WorkstreamState.update(
      slug: slug,
      project_dir: @project_dir,
      status: final_status,
      completed_at: Time.now.utc.iso8601
    )

    exit(exit_code)
  rescue => e
    # Update state on error
    Aidp::WorkstreamState.update(
      slug: slug,
      project_dir: @project_dir,
      status: "failed",
      completed_at: Time.now.utc.iso8601
    )

    # Log error and exit
    # Suppress backtrace noise during tests while keeping it for production debugging
    unless ENV["RSPEC_RUNNING"] == "true"
      warn("Error in workstream #{slug}: #{e.message}")
      warn(e.backtrace.first(5).join("\n"))
    end
    exit(1)
  end

  # Wait for child process
  _pid, status = Process.wait2(pid)
  completed_at = Time.now
  duration = completed_at - started_at

  # Build result
  result_status = status.success? ? "completed" : "failed"
  result = WorkstreamResult.new(
    slug: slug,
    status: result_status,
    exit_code: status.exitstatus,
    started_at: started_at,
    completed_at: completed_at,
    duration: duration,
    error: status.success? ? nil : "Process exited with code #{status.exitstatus}"
  )

  @results[slug] = result

  display_message("#{status.success? ? "" : ""} [#{slug}] #{result_status.capitalize} in #{format_duration(duration)}", type: status.success? ? :success : :error)

  result
rescue => e
  completed_at = Time.now
  duration = completed_at - started_at

  WorkstreamResult.new(
    slug: slug,
    status: "error",
    exit_code: 1,
    started_at: started_at,
    completed_at: completed_at,
    duration: duration,
    error: e.message
  )
end