Class: Plushie::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/plushie/thread_pool.rb

Overview

Simple bounded thread pool for background work.

Worker threads pull tasks from a shared queue. Sized to CPU count by default. Used for non-cancellable background operations.

Note: Command.async and Command.stream spawn dedicated threads (not from this pool) because they need individual cancel handles. The pool is used by the test framework and other non-cancellable work.

Examples:

pool = ThreadPool.new(size: 4)
pool.post { expensive_work() }
pool.shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: nil) ⇒ ThreadPool

Create a new thread pool.

Parameters:

  • size (Integer, nil) (defaults to: nil)

    number of workers (default: CPU count)



27
28
29
30
31
32
# File 'lib/plushie/thread_pool.rb', line 27

def initialize(size: nil)
  @size = size || default_size
  @queue = Thread::Queue.new
  @shutdown = false
  @workers = @size.times.map { spawn_worker }
end

Instance Attribute Details

#sizeInteger (readonly)

Returns number of worker threads.

Returns:

  • (Integer)

    number of worker threads



22
23
24
# File 'lib/plushie/thread_pool.rb', line 22

def size
  @size
end

Instance Method Details

#post { ... } ⇒ Object

Queue a block for execution by a worker thread.

Yields:

  • the work to execute

Raises:

  • (RuntimeError)

    if the pool has been shut down



38
39
40
41
# File 'lib/plushie/thread_pool.rb', line 38

def post(&block)
  raise "ThreadPool is shut down" if @shutdown
  @queue.push(block)
end

#shutdown(timeout: 5) ⇒ Object

Shut down the pool. Signals all workers to stop and waits for them to finish current work.

Parameters:

  • timeout (Numeric) (defaults to: 5)

    max seconds to wait per worker (default: 5)



47
48
49
50
51
52
53
54
55
56
# File 'lib/plushie/thread_pool.rb', line 47

def shutdown(timeout: 5)
  return if @shutdown
  @shutdown = true
  @size.times { @queue.push(:shutdown) }
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  @workers.each do |t|
    remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
    t.join([remaining, 0].max)
  end
end

#shutdown?Boolean

Returns true if the pool has been shut down.

Returns:

  • (Boolean)

    true if the pool has been shut down



59
60
61
# File 'lib/plushie/thread_pool.rb', line 59

def shutdown?
  @shutdown
end