Class: Workhorse::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/pool.rb

Overview

Thread pool abstraction used by workers for concurrent job execution. Wraps Concurrent::ThreadPoolExecutor to provide a simpler interface and custom behavior for job processing.

Examples:

Basic usage

pool = Workhorse::Pool.new(4)
pool.post { puts "Working..." }
pool.shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ Pool

Creates a new thread pool with the specified size.

Parameters:

  • size (Integer)

    Maximum number of threads in the pool



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/workhorse/pool.rb', line 20

def initialize(size)
  @size = size
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads:     0,
    max_threads:     @size,
    max_queue:       0,
    fallback_policy: :abort,
    auto_terminate:  false
  )
  @mutex = Mutex.new
  @active_threads = Concurrent::AtomicFixnum.new(0)
  @on_idle = nil
end

Instance Attribute Details

#active_threadsConcurrent::AtomicFixnum (readonly)

Returns Thread-safe counter of active threads.

Returns:

  • (Concurrent::AtomicFixnum)

    Thread-safe counter of active threads



15
16
17
# File 'lib/workhorse/pool.rb', line 15

def active_threads
  @active_threads
end

#mutexMutex (readonly)

Returns Synchronization mutex for thread safety.

Returns:

  • (Mutex)

    Synchronization mutex for thread safety



12
13
14
# File 'lib/workhorse/pool.rb', line 12

def mutex
  @mutex
end

Instance Method Details

#idleInteger

Returns the number of idle threads in the pool.

Returns:

  • (Integer)

    Number of idle threads



71
72
73
# File 'lib/workhorse/pool.rb', line 71

def idle
  @size - @active_threads.value
end

#on_idle { ... } ⇒ void

This method returns an undefined value.

Sets a callback to be executed when the pool becomes idle.

Yields:

  • Block to execute when all threads become idle



38
39
40
# File 'lib/workhorse/pool.rb', line 38

def on_idle(&block)
  @on_idle = block
end

#post { ... } ⇒ void

This method returns an undefined value.

Posts a new work unit to the pool for execution.

Yields:

  • The work block to execute

Raises:

  • (RuntimeError)

    If all threads are busy



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/workhorse/pool.rb', line 47

def post
  mutex.synchronize do
    if idle.zero?
      fail 'All threads are busy.'
    end

    active_threads = @active_threads

    active_threads.increment

    @executor.post do
      begin # rubocop:disable Style/RedundantBegin
        yield
      ensure
        active_threads.decrement
        @on_idle.try(:call)
      end
    end
  end
end

#shutdownvoid

This method returns an undefined value.

Shuts down the pool and waits for termination. All currently executing jobs will complete before shutdown.



94
95
96
97
# File 'lib/workhorse/pool.rb', line 94

def shutdown
  @executor.shutdown
  wait
end

#waitvoid

This method returns an undefined value.

Waits until the pool is shut down. This will wait forever unless you eventually call #shutdown (either before calling wait or after it in another thread).



80
81
82
83
84
85
86
87
88
# File 'lib/workhorse/pool.rb', line 80

def wait
  # Here we use a loop-sleep combination instead of using
  # ThreadPoolExecutor's `wait_for_termination`. See issue #21 for more
  # information.
  loop do
    break if @executor.shutdown?
    sleep 0.1
  end
end