Class: Workhorse::Pool
- Inherits:
-
Object
- Object
- Workhorse::Pool
- Defined in:
- lib/workhorse/pool.rb
Overview
Abstraction layer of a simple thread pool implementation used by the worker.
Instance Attribute Summary collapse
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
Instance Method Summary collapse
-
#idle ⇒ Object
Returns the number of idle threads.
-
#initialize(size) ⇒ Pool
constructor
A new instance of Pool.
-
#post ⇒ Object
Posts a new work unit to the pool.
-
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
- #wait ⇒ Object
Constructor Details
#initialize(size) ⇒ Pool
Returns a new instance of Pool.
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/workhorse/pool.rb', line 6 def initialize(size) @size = size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: @size, max_queue: 0, fallback_policy: :abort, auto_terminate: false ) @mutex = Mutex.new @active_threads = Concurrent::AtomicFixnum.new(0) end |
Instance Attribute Details
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
4 5 6 |
# File 'lib/workhorse/pool.rb', line 4 def mutex @mutex end |
Instance Method Details
#idle ⇒ Object
Returns the number of idle threads.
41 42 43 |
# File 'lib/workhorse/pool.rb', line 41 def idle @size - @active_threads.value end |
#post ⇒ Object
Posts a new work unit to the pool.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/workhorse/pool.rb', line 20 def post mutex.synchronize do if idle.zero? fail 'All threads are busy.' end active_threads = @active_threads active_threads.increment @executor.post do begin yield ensure active_threads.decrement end end end end |
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
50 51 52 53 |
# File 'lib/workhorse/pool.rb', line 50 def shutdown @executor.shutdown wait end |
#wait ⇒ Object
45 46 47 |
# File 'lib/workhorse/pool.rb', line 45 def wait @executor.wait_for_termination end |