Class: Webhookdb::Concurrent::ParallelizedPool
- Defined in:
- lib/webhookdb/concurrent.rb
Overview
Pool that does work across a given number of threads. queue_size
is how many items can be in the queue before post
blocks. threads
defaults to queue_size
, allowing at most queue_size
concurrent work, which fits the idea of a parallelized pool well.
If you want the calling thread to queue up a bunch of work ahead of time, you can use a Concurrent::ThreadPoolExecutor
. This pool will not allow the enqueing of more work while the queue is full.
Constant Summary collapse
- STOP =
:stop
Instance Method Summary collapse
-
#initialize(queue_size, timeout: nil, threads: nil) ⇒ ParallelizedPool
constructor
A new instance of ParallelizedPool.
- #join ⇒ Object
- #post(&task) ⇒ Object
Constructor Details
#initialize(queue_size, timeout: nil, threads: nil) ⇒ ParallelizedPool
Returns a new instance of ParallelizedPool.
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/webhookdb/concurrent.rb', line 52 def initialize(queue_size, timeout: nil, threads: nil) super() threads ||= queue_size @timeout = timeout @threads = (1..threads).map do Thread.new do loop { break unless self.do_work } end end @queue = Thread::SizedQueue.new(queue_size) @exception = nil end |
Instance Method Details
#join ⇒ Object
88 89 90 91 92 |
# File 'lib/webhookdb/concurrent.rb', line 88 def join @queue.push(STOP) @threads.each(&:join) raise @exception if @exception end |
#post(&task) ⇒ Object
81 82 83 84 85 86 |
# File 'lib/webhookdb/concurrent.rb', line 81 def post(&task) raise @exception if @exception added = @queue.push(task, timeout: @timeout) raise Timeout, "waited #{@timeout} to add to the queue" if added.nil? return true end |