Class: Aidp::WorkstreamExecutor

Inherits:
Object
  • Object
show all
Includes:
MessageDisplay
Defined in:
lib/aidp/workstream_executor.rb

Overview

Executes multiple workstreams in parallel using concurrent-ruby. Provides true parallel execution with process isolation and status tracking.

Defined Under Namespace

Classes: WorkstreamResult

Constant Summary

Constants included from MessageDisplay

MessageDisplay::COLOR_MAP, MessageDisplay::CRITICAL_TYPES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MessageDisplay

#display_message, included, #message_display_prompt, #quiet_mode?

Constructor Details

#initialize(project_dir: Dir.pwd, max_concurrent: 3, runner_factory: nil) ⇒ WorkstreamExecutor

Returns a new instance of WorkstreamExecutor.

Parameters:

  • project_dir (String) (defaults to: Dir.pwd)

    root directory of the project

  • max_concurrent (Integer) (defaults to: 3)

    maximum number of concurrent workstreams (thread pool size)

  • runner_factory (Proc) (defaults to: nil)

    factory that builds a harness runner. Signature: (path, mode, options) => object responding to #run



30
31
32
33
34
35
36
37
38
# File 'lib/aidp/workstream_executor.rb', line 30

def initialize(project_dir: Dir.pwd, max_concurrent: 3, runner_factory: nil)
  @project_dir = project_dir
  @max_concurrent = max_concurrent
  @results = Concurrent::Hash.new
  @start_times = Concurrent::Hash.new
  @runner_factory = runner_factory || lambda do |path, mode, options|
    Aidp::Harness::Runner.new(path, mode, options)
  end
end

Instance Attribute Details

#max_concurrentObject (readonly)

Expose for testability



13
14
15
# File 'lib/aidp/workstream_executor.rb', line 13

def max_concurrent
  @max_concurrent
end

#project_dirObject (readonly)

Expose for testability



13
14
15
# File 'lib/aidp/workstream_executor.rb', line 13

def project_dir
  @project_dir
end

#resultsObject (readonly)

Expose for testability



13
14
15
# File 'lib/aidp/workstream_executor.rb', line 13

def results
  @results
end

#start_timesObject (readonly)

Expose for testability



13
14
15
# File 'lib/aidp/workstream_executor.rb', line 13

def start_times
  @start_times
end

Instance Method Details

#execute_all(options = {}) ⇒ Array<WorkstreamResult>

Execute all active workstreams in parallel

Parameters:

  • options (Hash) (defaults to: {})

    Execution options (same as execute_parallel)

Returns:



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/aidp/workstream_executor.rb', line 78

def execute_all(options = {})
  workstreams = Aidp::Worktree.list(project_dir: @project_dir)
  active_slugs = workstreams.select { |ws| ws[:active] }.map { |ws| ws[:slug] }

  if active_slugs.empty?
    display_message("⚠️  No active workstreams found", type: :warn)
    return []
  end

  execute_parallel(active_slugs, options)
end

#execute_parallel(slugs, options = {}) ⇒ Array<WorkstreamResult>

Execute multiple workstreams in parallel

Parameters:

  • slugs (Array<String>)

    Workstream slugs to execute

  • options (Hash) (defaults to: {})

    Execution options

Options Hash (options):

  • :selected_steps (Array<String>)

    Steps to execute

  • :workflow_type (Symbol)

    Workflow type (:execute, :analyze, etc.)

  • :user_input (Hash)

    User input for harness

Returns:



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/aidp/workstream_executor.rb', line 48

def execute_parallel(slugs, options = {})
  validate_workstreams!(slugs)

  display_message("🚀 Starting parallel execution of #{slugs.size} workstreams (max #{@max_concurrent} concurrent)", type: :info)

  # Create thread pool with max concurrent limit
  pool = Concurrent::FixedThreadPool.new(@max_concurrent)

  # Create futures for each workstream
  futures = slugs.map do |slug|
    Concurrent::Future.execute(executor: pool) do
      execute_workstream(slug, options)
    end
  end

  # Wait for all futures to complete
  results = futures.map(&:value)

  # Shutdown pool gracefully
  pool.shutdown
  pool.wait_for_termination(30)

  display_execution_summary(results)
  results
end

#execute_workstream(slug, options = {}) ⇒ WorkstreamResult

Execute a single workstream (used by futures in parallel execution)

Parameters:

  • slug (String)

    Workstream slug

  • options (Hash) (defaults to: {})

    Execution options

Returns:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/aidp/workstream_executor.rb', line 95

def execute_workstream(slug, options = {})
  started_at = Time.now
  @start_times[slug] = started_at

  workstream = Aidp::Worktree.info(slug: slug, project_dir: @project_dir)
  unless workstream
    return WorkstreamResult.new(
      slug: slug,
      status: "error",
      exit_code: 1,
      started_at: started_at,
      completed_at: Time.now,
      duration: 0,
      error: "Workstream not found"
    )
  end

  display_message("▶️  [#{slug}] Starting execution in #{workstream[:path]}", type: :info)

  # Update workstream state to active
  Aidp::WorkstreamState.update(
    slug: slug,
    project_dir: @project_dir,
    status: "active",
    started_at: started_at.utc.iso8601
  )

  # Execute in forked process for true isolation
  pid = fork do
    # Change to workstream directory
    Dir.chdir(workstream[:path])

    # Execute harness
    runner = @runner_factory.call(
      workstream[:path],
      options[:mode] || :execute,
      options
    )

    result = runner.run

    # Update state on completion
    exit_code = (result[:status] == "completed") ? 0 : 1
    final_status = (result[:status] == "completed") ? "completed" : "failed"

    Aidp::WorkstreamState.update(
      slug: slug,
      project_dir: @project_dir,
      status: final_status,
      completed_at: Time.now.utc.iso8601
    )

    exit(exit_code)
  rescue => e
    # Update state on error
    Aidp::WorkstreamState.update(
      slug: slug,
      project_dir: @project_dir,
      status: "failed",
      completed_at: Time.now.utc.iso8601
    )

    # Log error and exit
    # Suppress backtrace noise during tests while keeping it for production debugging
    unless ENV["RSPEC_RUNNING"] == "true"
      warn("Error in workstream #{slug}: #{e.message}")
      warn(e.backtrace.first(5).join("\n"))
    end
    exit(1)
  end

  # Wait for child process
  _pid, status = Process.wait2(pid)
  completed_at = Time.now
  duration = completed_at - started_at

  # Build result
  result_status = status.success? ? "completed" : "failed"
  result = WorkstreamResult.new(
    slug: slug,
    status: result_status,
    exit_code: status.exitstatus,
    started_at: started_at,
    completed_at: completed_at,
    duration: duration,
    error: status.success? ? nil : "Process exited with code #{status.exitstatus}"
  )

  @results[slug] = result

  display_message("#{status.success? ? "" : ""} [#{slug}] #{result_status.capitalize} in #{format_duration(duration)}", type: status.success? ? :success : :error)

  result
rescue => e
  completed_at = Time.now
  duration = completed_at - started_at

  WorkstreamResult.new(
    slug: slug,
    status: "error",
    exit_code: 1,
    started_at: started_at,
    completed_at: completed_at,
    duration: duration,
    error: e.message
  )
end