Module: Aidp::Concurrency::Exec
- Defined in:
- lib/aidp/concurrency/exec.rb
Overview
Centralized executor and thread pool management.
Provides named, configured executors for different workload types (I/O-bound, CPU-bound, background tasks) with standardized error handling and instrumentation.
Class Attribute Summary collapse
-
.default_pool ⇒ Concurrent::ThreadPoolExecutor
Get the default executor pool.
-
.pools ⇒ Object
writeonly
Expose for testability - reset pool cache between tests.
Class Method Summary collapse
-
.future(executor: nil) { ... } ⇒ Concurrent::Promises::Future
Execute a block on a future using the default pool.
-
.pool(name:, size: nil, type: :fixed) ⇒ Concurrent::ThreadPoolExecutor
Get or create a named thread pool.
-
.shutdown_all(timeout: 60) ⇒ Hash
Shutdown all managed pools.
-
.shutdown_pool(name, timeout: 60) ⇒ Boolean
Shutdown a specific pool.
-
.stats ⇒ Hash
Get statistics for all pools.
-
.zip(*futures) ⇒ Concurrent::Promises::Future
Execute multiple futures in parallel and wait for all to complete.
Class Attribute Details
.default_pool ⇒ Concurrent::ThreadPoolExecutor
Get the default executor pool.
80 81 82 |
# File 'lib/aidp/concurrency/exec.rb', line 80 def default_pool @default_pool ||= pool(name: :default, size: default_pool_size) end |
.pools=(value) ⇒ Object (writeonly)
Expose for testability - reset pool cache between tests
25 26 27 |
# File 'lib/aidp/concurrency/exec.rb', line 25 def pools=(value) @pools = value end |
Class Method Details
.future(executor: nil) { ... } ⇒ Concurrent::Promises::Future
Execute a block on a future using the default pool.
54 55 56 57 58 59 |
# File 'lib/aidp/concurrency/exec.rb', line 54 def future(executor: nil, &block) raise ArgumentError, "Block required" unless block_given? executor ||= default_pool Concurrent::Promises.future_on(executor, &block) end |
.pool(name:, size: nil, type: :fixed) ⇒ Concurrent::ThreadPoolExecutor
Get or create a named thread pool.
Pools are cached by name. Calling this method multiple times with the same name returns the same pool instance.
40 41 42 43 |
# File 'lib/aidp/concurrency/exec.rb', line 40 def pool(name:, size: nil, type: :fixed) @pools ||= Concurrent::Hash.new @pools[name] ||= create_pool(name, size, type) end |
.shutdown_all(timeout: 60) ⇒ Hash
Shutdown all managed pools.
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/aidp/concurrency/exec.rb', line 102 def shutdown_all(timeout: 60) @pools ||= Concurrent::Hash.new results = {} @pools.each_key do |name| results[name] = shutdown_pool(name, timeout: timeout) end @default_pool&.shutdown @default_pool&.wait_for_termination(timeout) results end |
.shutdown_pool(name, timeout: 60) ⇒ Boolean
Shutdown a specific pool.
89 90 91 92 93 94 95 96 |
# File 'lib/aidp/concurrency/exec.rb', line 89 def shutdown_pool(name, timeout: 60) @pools ||= Concurrent::Hash.new pool = @pools.delete(name) return true unless pool pool.shutdown pool.wait_for_termination(timeout) end |
.stats ⇒ Hash
Get statistics for all pools.
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/aidp/concurrency/exec.rb', line 119 def stats @pools ||= Concurrent::Hash.new stats = {} @pools.each do |name, pool| stats[name] = pool_stats(pool) end stats[:default] = pool_stats(@default_pool) if @default_pool stats end |
.zip(*futures) ⇒ Concurrent::Promises::Future
Execute multiple futures in parallel and wait for all to complete.
73 74 75 |
# File 'lib/aidp/concurrency/exec.rb', line 73 def zip(*futures) Concurrent::Promises.zip(*futures) end |