Class: Webhookdb::Concurrent::ParallelizedPool

Inherits:
Pool
  • Object
show all
Defined in:
lib/webhookdb/concurrent.rb

Overview

Pool that does work across a given number of threads. queue_size is how many items can be in the queue before post blocks. threads defaults to queue_size, allowing at most queue_size concurrent work, which fits the idea of a parallelized pool well.

If you want the calling thread to queue up a bunch of work ahead of time, you can use a Concurrent::ThreadPoolExecutor. This pool will not allow the enqueing of more work while the queue is full.

Constant Summary collapse

STOP =
:stop

Instance Method Summary collapse

Constructor Details

#initialize(queue_size, timeout: nil, threads: nil) ⇒ ParallelizedPool

Returns a new instance of ParallelizedPool.



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/webhookdb/concurrent.rb', line 52

def initialize(queue_size, timeout: nil, threads: nil)
  super()
  threads ||= queue_size
  @timeout = timeout
  @threads = (1..threads).map do
    Thread.new do
      loop { break unless self.do_work }
    end
  end
  @queue = Thread::SizedQueue.new(queue_size)
  @exception = nil
end

Instance Method Details

#joinObject

Raises:

  • (@exception)


88
89
90
91
92
# File 'lib/webhookdb/concurrent.rb', line 88

def join
  @queue.push(STOP)
  @threads.each(&:join)
  raise @exception if @exception
end

#post(&task) ⇒ Object

Raises:

  • (@exception)


81
82
83
84
85
86
# File 'lib/webhookdb/concurrent.rb', line 81

def post(&task)
  raise @exception if @exception
  added = @queue.push(task, timeout: @timeout)
  raise Timeout, "waited #{@timeout} to add to the queue" if added.nil?
  return true
end