Class: Workhorse::Pool
- Inherits:
-
Object
- Object
- Workhorse::Pool
- Defined in:
- lib/workhorse/pool.rb
Overview
Thread pool abstraction used by workers for concurrent job execution. Wraps Concurrent::ThreadPoolExecutor to provide a simpler interface and custom behavior for job processing.
Instance Attribute Summary collapse
-
#active_threads ⇒ Concurrent::AtomicFixnum
readonly
Thread-safe counter of active threads.
-
#mutex ⇒ Mutex
readonly
Synchronization mutex for thread safety.
Instance Method Summary collapse
-
#idle ⇒ Integer
Returns the number of idle threads in the pool.
-
#initialize(size) ⇒ Pool
constructor
Creates a new thread pool with the specified size.
-
#on_idle { ... } ⇒ void
Sets a callback to be executed when the pool becomes idle.
-
#post { ... } ⇒ void
Posts a new work unit to the pool for execution.
-
#shutdown ⇒ void
Shuts down the pool and waits for termination.
-
#wait ⇒ void
Waits until the pool is shut down.
Constructor Details
#initialize(size) ⇒ Pool
Creates a new thread pool with the specified size.
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/workhorse/pool.rb', line 20 def initialize(size) @size = size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: @size, max_queue: 0, fallback_policy: :abort, auto_terminate: false ) @mutex = Mutex.new @active_threads = Concurrent::AtomicFixnum.new(0) @on_idle = nil end |
Instance Attribute Details
#active_threads ⇒ Concurrent::AtomicFixnum (readonly)
Returns Thread-safe counter of active threads.
15 16 17 |
# File 'lib/workhorse/pool.rb', line 15 def active_threads @active_threads end |
#mutex ⇒ Mutex (readonly)
Returns Synchronization mutex for thread safety.
12 13 14 |
# File 'lib/workhorse/pool.rb', line 12 def mutex @mutex end |
Instance Method Details
#idle ⇒ Integer
Returns the number of idle threads in the pool.
71 72 73 |
# File 'lib/workhorse/pool.rb', line 71 def idle @size - @active_threads.value end |
#on_idle { ... } ⇒ void
This method returns an undefined value.
Sets a callback to be executed when the pool becomes idle.
38 39 40 |
# File 'lib/workhorse/pool.rb', line 38 def on_idle(&block) @on_idle = block end |
#post { ... } ⇒ void
This method returns an undefined value.
Posts a new work unit to the pool for execution.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/workhorse/pool.rb', line 47 def post mutex.synchronize do if idle.zero? fail 'All threads are busy.' end active_threads = @active_threads active_threads.increment @executor.post do begin # rubocop:disable Style/RedundantBegin yield ensure active_threads.decrement @on_idle.try(:call) end end end end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the pool and waits for termination. All currently executing jobs will complete before shutdown.
94 95 96 97 |
# File 'lib/workhorse/pool.rb', line 94 def shutdown @executor.shutdown wait end |
#wait ⇒ void
This method returns an undefined value.
Waits until the pool is shut down. This will wait forever unless you eventually call #shutdown (either before calling wait or after it in another thread).
80 81 82 83 84 85 86 87 88 |
# File 'lib/workhorse/pool.rb', line 80 def wait # Here we use a loop-sleep combination instead of using # ThreadPoolExecutor's `wait_for_termination`. See issue #21 for more # information. loop do break if @executor.shutdown? sleep 0.1 end end |