Class: VCAP::Concurrency::ThreadPool
- Inherits:
-
Object
- Object
- VCAP::Concurrency::ThreadPool
- Defined in:
- lib/vcap/concurrency/thread_pool.rb
Constant Summary collapse
- STOP_SENTINEL =
:stop
- STATE_CREATED =
0
- STATE_STARTED =
1
- STATE_STOPPED =
2
Instance Method Summary collapse
-
#enqueue(&blk) ⇒ VCAP::Concurrent::Promise
Adds a block that will be executed by a worker thread.
-
#initialize(num_threads) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
-
#join ⇒ Object
Waits for all worker threads to finish executing.
-
#num_active_tasks ⇒ Integer
Returns the number of tasks that are currently running.
-
#num_queued_tasks ⇒ Integer
Returns the number of tasks waiting to be processed.
-
#shutdown ⇒ Object
Queues up sentinel values to notify workers to stop, then waits for them to finish.
-
#start ⇒ Object
Creates all threads in the pool and starts them.
-
#stop ⇒ Object
Stops the thread pool politely, allowing existing work to be completed.
Constructor Details
#initialize(num_threads) ⇒ ThreadPool
Returns a new instance of ThreadPool.
18 19 20 21 22 23 24 25 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 18 def initialize(num_threads) @num_threads = num_threads @threads = [] @work_queue = Queue.new @state = STATE_CREATED @pool_lock = Mutex.new @num_active_tasks = VCAP::Concurrency::AtomicVar.new(0) end |
Instance Method Details
#enqueue(&blk) ⇒ VCAP::Concurrent::Promise
Adds a block that will be executed by a worker thread.
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 50 def enqueue(&blk) @pool_lock.synchronize do assert_state_in(STATE_CREATED, STATE_STARTED) promise = VCAP::Concurrency::Promise.new @work_queue.enq([blk, promise]) promise end end |
#join ⇒ Object
Waits for all worker threads to finish executing.
74 75 76 77 78 79 80 81 82 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 74 def join @pool_lock.synchronize do assert_state_in(STATE_STARTED, STATE_STOPPED) end @threads.each { |t| t.join } nil end |
#num_active_tasks ⇒ Integer
Returns the number of tasks that are currently running. This is equivalent to the number of active threads.
97 98 99 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 97 def num_active_tasks @num_active_tasks.value end |
#num_queued_tasks ⇒ Integer
Returns the number of tasks waiting to be processed
NB: While technically correct, this will include the number of unprocessed
sentinel tasks after stop() is called.
107 108 109 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 107 def num_queued_tasks @work_queue.length end |
#shutdown ⇒ Object
Queues up sentinel values to notify workers to stop, then waits for them to finish.
86 87 88 89 90 91 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 86 def shutdown stop join nil end |
#start ⇒ Object
Creates all threads in the pool and starts them. Tasks that were enqueued prior to starting the pool will be processed immediately.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 29 def start @pool_lock.synchronize do assert_state_in(STATE_CREATED) @num_threads.times do @threads << create_worker_thread end @state = STATE_STARTED end nil end |
#stop ⇒ Object
Stops the thread pool politely, allowing existing work to be completed.
63 64 65 66 67 68 69 70 71 |
# File 'lib/vcap/concurrency/thread_pool.rb', line 63 def stop @pool_lock.synchronize do @num_threads.times { @work_queue.enq(STOP_SENTINEL) } @state = STATE_STOPPED end nil end |