Module: Aidp::Concurrency::Exec
- Defined in:
- lib/aidp/concurrency/exec.rb
Overview
Centralized executor and thread pool management.
Provides named, configured executors for different workload types (I/O-bound, CPU-bound, background tasks) with standardized error handling and instrumentation.
Class Method Summary collapse
-
.default_pool ⇒ Concurrent::ThreadPoolExecutor
Get the default executor pool.
-
.future(executor: nil) { ... } ⇒ Concurrent::Promises::Future
Execute a block on a future using the default pool.
-
.pool(name:, size: nil, type: :fixed) ⇒ Concurrent::ThreadPoolExecutor
Get or create a named thread pool.
-
.shutdown_all(timeout: 60) ⇒ Hash
Shutdown all managed pools.
-
.shutdown_pool(name, timeout: 60) ⇒ Boolean
Shutdown a specific pool.
-
.stats ⇒ Hash
Get statistics for all pools.
-
.zip(*futures) ⇒ Concurrent::Promises::Future
Execute multiple futures in parallel and wait for all to complete.
Class Method Details
.default_pool ⇒ Concurrent::ThreadPoolExecutor
Get the default executor pool.
77 78 79 |
# File 'lib/aidp/concurrency/exec.rb', line 77 def default_pool @default_pool ||= pool(name: :default, size: default_pool_size) end |
.future(executor: nil) { ... } ⇒ Concurrent::Promises::Future
Execute a block on a future using the default pool.
51 52 53 54 55 56 |
# File 'lib/aidp/concurrency/exec.rb', line 51 def future(executor: nil, &block) raise ArgumentError, "Block required" unless block_given? executor ||= default_pool Concurrent::Promises.future_on(executor, &block) end |
.pool(name:, size: nil, type: :fixed) ⇒ Concurrent::ThreadPoolExecutor
Get or create a named thread pool.
Pools are cached by name. Calling this method multiple times with the same name returns the same pool instance.
37 38 39 40 |
# File 'lib/aidp/concurrency/exec.rb', line 37 def pool(name:, size: nil, type: :fixed) @pools ||= Concurrent::Hash.new @pools[name] ||= create_pool(name, size, type) end |
.shutdown_all(timeout: 60) ⇒ Hash
Shutdown all managed pools.
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/aidp/concurrency/exec.rb', line 99 def shutdown_all(timeout: 60) @pools ||= Concurrent::Hash.new results = {} @pools.each_key do |name| results[name] = shutdown_pool(name, timeout: timeout) end @default_pool&.shutdown @default_pool&.wait_for_termination(timeout) results end |
.shutdown_pool(name, timeout: 60) ⇒ Boolean
Shutdown a specific pool.
86 87 88 89 90 91 92 93 |
# File 'lib/aidp/concurrency/exec.rb', line 86 def shutdown_pool(name, timeout: 60) @pools ||= Concurrent::Hash.new pool = @pools.delete(name) return true unless pool pool.shutdown pool.wait_for_termination(timeout) end |
.stats ⇒ Hash
Get statistics for all pools.
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/aidp/concurrency/exec.rb', line 116 def stats @pools ||= Concurrent::Hash.new stats = {} @pools.each do |name, pool| stats[name] = pool_stats(pool) end stats[:default] = pool_stats(@default_pool) if @default_pool stats end |
.zip(*futures) ⇒ Concurrent::Promises::Future
Execute multiple futures in parallel and wait for all to complete.
70 71 72 |
# File 'lib/aidp/concurrency/exec.rb', line 70 def zip(*futures) Concurrent::Promises.zip(*futures) end |