Class: Traject::ThreadPool
- Inherits:
-
Object
- Object
- Traject::ThreadPool
- Defined in:
- lib/traject/thread_pool.rb
Overview
2) If initialized with nil for threadcount, no thread pool will actually
be created, and all threadpool-related methods become no-ops. We call this
the nil/null threadpool. A non-nil threadpool requires jruby, but you can
create a null Traject::ThreadPool.new(nil) under MRI without anything
complaining.
3) Use the #maybe_in_threadpool method to send blocks to thread pool for
execution -- if no threadpool configured your block will just be
executed in calling thread. Be careful to not refer to any non-local
variables in the block, unless the variable has an object you can
use thread-safely!
4) Thread pools are java.util.concurrent.ThreadPoolExecutor, manually created
with a work queue that will buffer up to (pool_size*3) tasks. If queue is full,
the ThreadPoolExecutor is set up to use the ThreadPoolExecutor.CallerRunsPolicy,
meaning the block will end up executing in caller's own thread. With the kind
of work we're doing, where each unit of work is small and there are many of them--
the CallerRunsPolicy serves as an effective 'back pressure' mechanism to keep
the work queue from getting too large and exhausting memory, when producers are
faster than consumers.
5) Any exceptions raised by pool-executed work are captured accumulated in a thread-safe
manner, and can be re-raised in the thread of your choice by calling
#raise_collected_exception!
6) When you are done with the threadpool, you can and must call
#shutdown_and_wait, which will wait for all current queued work
to complete, then return. You can not give any more work to the pool
after you do this. By default it'll wait pretty much forever, which should
be fine. If you never call shutdown, the pool will keep running forever
and not allow your program to exit!
7) We will keep track of total times a block is run in thread pool, and
total elapsed (wall) time of running all blocks, so an average_execution_ms
time can be given. #average_execution_ms may be inaccurate if called when
threads are still executing, as it's not entirely thread safe (may get
an off by one as to total iterations)
Instance Attribute Summary collapse
-
#label ⇒ Object
readonly
Returns the value of attribute label.
-
#pool_size ⇒ Object
readonly
Returns the value of attribute pool_size.
-
#queue_capacity ⇒ Object
readonly
Returns the value of attribute queue_capacity.
Instance Method Summary collapse
-
#collect_exception(e) ⇒ Object
thread-safe way of storing an exception, to raise later in a different thread.
-
#initialize(pool_size) ⇒ ThreadPool
constructor
First arg is pool size, 0 or nil and we’ll be a null/no-op pool.
-
#maybe_in_thread_pool ⇒ Object
Pass it a block, MAYBE gets executed in the bg in a thread pool.
-
#queue ⇒ Object
Just for monitoring/debugging purposes, we’ll return the work queue used by the threadpool.
-
#raise_collected_exception! ⇒ Object
If there’s a stored collected exception, raise it again now.
-
#shutdown_and_wait ⇒ Object
shutdown threadpool, and wait for all work to complete.
Constructor Details
#initialize(pool_size) ⇒ ThreadPool
First arg is pool size, 0 or nil and we’ll be a null/no-op pool
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/traject/thread_pool.rb', line 49 def initialize(pool_size) unless pool_size.nil? || pool_size == 0 require 'java' # trigger an exception now if we're not jruby @label = label @pool_size = pool_size.to_i # just for reflection, we don't really need it again @queue_capacity = pool_size * 3 blockingQueue = java.util.concurrent.ArrayBlockingQueue.new(@queue_capacity) rejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy.new # keepalive times don't matter, we are setting core and max pool to # same thing, fixed size pool. @thread_pool = java.util.concurrent.ThreadPoolExecutor.new( @pool_size, @pool_size, 0, java.util.concurrent.TimeUnit::MILLISECONDS, blockingQueue, rejectedExecutionHandler) # A thread-safe queue to collect exceptions cross-threads. # We make it small, we really only need to store the first # exception, we don't care too much about others. But we'll # keep the first 20, why not. @async_exception_queue = java.util.concurrent.ArrayBlockingQueue.new(20) end end |
Instance Attribute Details
#label ⇒ Object (readonly)
Returns the value of attribute label.
46 47 48 |
# File 'lib/traject/thread_pool.rb', line 46 def label @label end |
#pool_size ⇒ Object (readonly)
Returns the value of attribute pool_size.
46 47 48 |
# File 'lib/traject/thread_pool.rb', line 46 def pool_size @pool_size end |
#queue_capacity ⇒ Object (readonly)
Returns the value of attribute queue_capacity.
46 47 48 |
# File 'lib/traject/thread_pool.rb', line 46 def queue_capacity @queue_capacity end |
Instance Method Details
#collect_exception(e) ⇒ Object
thread-safe way of storing an exception, to raise later in a different thread. We don’t guarantee that we can store more than one at a time, only the first one recorded may be stored.
116 117 118 119 120 |
# File 'lib/traject/thread_pool.rb', line 116 def collect_exception(e) # offer will silently do nothing if the queue is full, that's fine # with us. @async_exception_queue.offer(e) end |
#maybe_in_thread_pool ⇒ Object
Pass it a block, MAYBE gets executed in the bg in a thread pool. Maybe gets executed in the calling thread.
There are actually two ‘maybes’:
-
If Traject::ThreadPool was configured with null thread pool, then ALL work will be executed in calling thread.
-
If there is a thread pool, but it’s work queue is full, then a job will be executed in calling thread (because we configured our java thread pool with a limited sized queue, and CallerRunsPolicy rejection strategy)
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/traject/thread_pool.rb', line 87 def maybe_in_thread_pool start_t = Time.now if @thread_pool @thread_pool.execute do begin yield rescue Exception => e collect_exception(e) end end else yield end end |
#queue ⇒ Object
Just for monitoring/debugging purposes, we’ll return the work queue used by the threadpool. Don’t recommend you do anything with it, as the original java.util.concurrent docs make the same recommendation.
108 109 110 |
# File 'lib/traject/thread_pool.rb', line 108 def queue @thread_pool && @thread_pool.queue end |
#raise_collected_exception! ⇒ Object
If there’s a stored collected exception, raise it again now. Call this to re-raise exceptions caught in other threads in the thread of your choice.
If you call this method on a ThreadPool initialized with nil as a non-functioning threadpool – then this method is just a no-op.
129 130 131 132 133 |
# File 'lib/traject/thread_pool.rb', line 129 def raise_collected_exception! if @async_exception_queue && e = @async_exception_queue.poll raise e end end |
#shutdown_and_wait ⇒ Object
shutdown threadpool, and wait for all work to complete. this one is also a no-op if you have a null ThreadPool that doesn’t really have a threadpool at all.
returns elapsed time in seconds it took to shutdown
140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/traject/thread_pool.rb', line 140 def shutdown_and_wait start_t = Time.now if @thread_pool @thread_pool.shutdown # We pretty much want to wait forever, although we need to give # a timeout. Okay, one day! @thread_pool.awaitTermination(1, java.util.concurrent.TimeUnit::DAYS) end return (Time.now - start_t) end |