Module: Aidp::Concurrency::Exec

Defined in:
lib/aidp/concurrency/exec.rb

Overview

Centralized executor and thread pool management.

Provides named, configured executors for different workload types (I/O-bound, CPU-bound, background tasks) with standardized error handling and instrumentation.

Examples:

Get a named pool

pool = Exec.pool(name: :io_pool, size: 10)
future = Concurrent::Promises.future_on(pool) { fetch_remote_data() }

Execute a future

result = Exec.future { expensive_computation() }.value!

Shutdown all pools

Exec.shutdown_all

Class Method Summary collapse

Class Method Details

.default_poolConcurrent::ThreadPoolExecutor

Get the default executor pool.

Returns:

  • (Concurrent::ThreadPoolExecutor)


77
78
79
# File 'lib/aidp/concurrency/exec.rb', line 77

def default_pool
  @default_pool ||= pool(name: :default, size: default_pool_size)
end

.future(executor: nil) { ... } ⇒ Concurrent::Promises::Future

Execute a block on a future using the default pool.

Examples:

future = Exec.future { slow_operation() }
result = future.value! # Wait for result

Parameters:

  • executor (Concurrent::ExecutorService) (defaults to: nil)

    Custom executor (optional)

Yields:

  • Block to execute asynchronously

Returns:

  • (Concurrent::Promises::Future)

    The future

Raises:

  • (ArgumentError)


51
52
53
54
55
56
# File 'lib/aidp/concurrency/exec.rb', line 51

def future(executor: nil, &block)
  raise ArgumentError, "Block required" unless block_given?

  executor ||= default_pool
  Concurrent::Promises.future_on(executor, &block)
end

.pool(name:, size: nil, type: :fixed) ⇒ Concurrent::ThreadPoolExecutor

Get or create a named thread pool.

Pools are cached by name. Calling this method multiple times with the same name returns the same pool instance.

Examples:

io_pool = Exec.pool(name: :io, size: 20)
cpu_pool = Exec.pool(name: :cpu, size: 4)

Parameters:

  • name (Symbol)

    Pool name (e.g., :io_pool, :cpu_pool, :background)

  • size (Integer) (defaults to: nil)

    Pool size (default: based on pool type)

  • type (Symbol) (defaults to: :fixed)

    Pool type :fixed or :cached (default: :fixed)

Returns:

  • (Concurrent::ThreadPoolExecutor)

    The thread pool



37
38
39
40
# File 'lib/aidp/concurrency/exec.rb', line 37

def pool(name:, size: nil, type: :fixed)
  @pools ||= Concurrent::Hash.new
  @pools[name] ||= create_pool(name, size, type)
end

.shutdown_all(timeout: 60) ⇒ Hash

Shutdown all managed pools.

Parameters:

  • timeout (Float) (defaults to: 60)

    Seconds to wait for each pool

Returns:

  • (Hash)

    Map of pool name to shutdown success



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/aidp/concurrency/exec.rb', line 99

def shutdown_all(timeout: 60)
  @pools ||= Concurrent::Hash.new
  results = {}

  @pools.each_key do |name|
    results[name] = shutdown_pool(name, timeout: timeout)
  end

  @default_pool&.shutdown
  @default_pool&.wait_for_termination(timeout)

  results
end

.shutdown_pool(name, timeout: 60) ⇒ Boolean

Shutdown a specific pool.

Parameters:

  • name (Symbol)

    Pool name

  • timeout (Float) (defaults to: 60)

    Seconds to wait for shutdown

Returns:

  • (Boolean)

    true if shutdown cleanly



86
87
88
89
90
91
92
93
# File 'lib/aidp/concurrency/exec.rb', line 86

def shutdown_pool(name, timeout: 60)
  @pools ||= Concurrent::Hash.new
  pool = @pools.delete(name)
  return true unless pool

  pool.shutdown
  pool.wait_for_termination(timeout)
end

.statsHash

Get statistics for all pools.

Returns:

  • (Hash)

    Map of pool name to stats



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/aidp/concurrency/exec.rb', line 116

def stats
  @pools ||= Concurrent::Hash.new
  stats = {}

  @pools.each do |name, pool|
    stats[name] = pool_stats(pool)
  end

  stats[:default] = pool_stats(@default_pool) if @default_pool

  stats
end

.zip(*futures) ⇒ Concurrent::Promises::Future

Execute multiple futures in parallel and wait for all to complete.

Examples:

futures = [
  Exec.future { task1() },
  Exec.future { task2() },
  Exec.future { task3() }
]
results = Exec.zip(*futures).value!

Parameters:

  • futures (Array<Concurrent::Promises::Future>)

    Futures to zip

Returns:

  • (Concurrent::Promises::Future)

    Future that resolves when all complete



70
71
72
# File 'lib/aidp/concurrency/exec.rb', line 70

def zip(*futures)
  Concurrent::Promises.zip(*futures)
end