Class: Plushie::ThreadPool
- Inherits:
-
Object
- Object
- Plushie::ThreadPool
- Defined in:
- lib/plushie/thread_pool.rb
Overview
Simple bounded thread pool for background work.
Worker threads pull tasks from a shared queue. Sized to CPU count by default. Used for non-cancellable background operations.
Note: Command.async and Command.stream spawn dedicated threads (not from this pool) because they need individual cancel handles. The pool is used by the test framework and other non-cancellable work.
Instance Attribute Summary collapse
-
#size ⇒ Integer
readonly
Number of worker threads.
Instance Method Summary collapse
-
#initialize(size: nil) ⇒ ThreadPool
constructor
Create a new thread pool.
-
#post { ... } ⇒ Object
Queue a block for execution by a worker thread.
-
#shutdown(timeout: 5) ⇒ Object
Shut down the pool.
-
#shutdown? ⇒ Boolean
True if the pool has been shut down.
Constructor Details
#initialize(size: nil) ⇒ ThreadPool
Create a new thread pool.
27 28 29 30 31 32 |
# File 'lib/plushie/thread_pool.rb', line 27 def initialize(size: nil) @size = size || default_size @queue = Thread::Queue.new @shutdown = false @workers = @size.times.map { spawn_worker } end |
Instance Attribute Details
#size ⇒ Integer (readonly)
Returns number of worker threads.
22 23 24 |
# File 'lib/plushie/thread_pool.rb', line 22 def size @size end |
Instance Method Details
#post { ... } ⇒ Object
Queue a block for execution by a worker thread.
38 39 40 41 |
# File 'lib/plushie/thread_pool.rb', line 38 def post(&block) raise "ThreadPool is shut down" if @shutdown @queue.push(block) end |
#shutdown(timeout: 5) ⇒ Object
Shut down the pool. Signals all workers to stop and waits for them to finish current work.
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/plushie/thread_pool.rb', line 47 def shutdown(timeout: 5) return if @shutdown @shutdown = true @size.times { @queue.push(:shutdown) } deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @workers.each do |t| remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) t.join([remaining, 0].max) end end |
#shutdown? ⇒ Boolean
Returns true if the pool has been shut down.
59 60 61 |
# File 'lib/plushie/thread_pool.rb', line 59 def shutdown? @shutdown end |