thread_storm
Simple thread pool with a few advanced features.
Features
* execution state querying
* timeouts and configurable timeout implementation
* graceful error handling
* unit tests
Example
pool = ThreadStorm.new :size => 2
pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.01); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join # Should return in about 0.02 seconds... ;)
pool.values # ["a", "b", "c"]
Execution state
You can query the state of an execution.
pool = ThreadStorm.new :size => 2
execution = pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.01); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join
execution.started? # true
execution.finished? # true
execution.timed_out? # false
execution.duration # ~0.01
execution.value # "a"
Timeouts
You can restrict how long executions are allowed to run for.
pool = ThreadStorm.new :size => 2, :timeout => 0.02, :default_value => "failed"
pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.03); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join
pool.executions[1].started? # true
pool.executions[1].finished? # true
pool.executions[1].timed_out? # true
pool.executions[1].duration # ~0.02
pool.executions[1].value # "failed"
Error handling
If an execution causes an exception, it will be reraised when ThreadStorm#join (or any other method that calls ThreadStorm#join) is called, unless you pass :reraise => false
to ThreadStorm#new. The exception is stored in ThreadStorm::Execution#exception.
pool = ThreadStorm.new :size => 2, :reraise => false, :default_value => "failure"
execution = pool.execute{ raise("busted"); "a" }
pool.join
execution.value # "failure"
execution.exception # RuntimeError: busted
Joining vs shutting down
ThreadStorm#join blocks until all pending executions are done running. It does not actually kill the thread storm’s worker threads (incase you want to do more work). ThreadStorm#shutdown actually kills the worker threads.
Sometimes it can be a pain to remember to call #shutdown, so as a convenience, you can pass a block to ThreadStorm#new and #join and #shutdown will be called for you.
party = ThreadStorm.new do |p|
p.execute{ "a" }
p.execute{ "b" }
p.execute{ "c" }
end
# At this point, #join and #shutdown have been called.
party.values # ["a", "b", "c"]
Configurable timeout method
Timeout.timeout
is unreliable in MRI 1.8.x. To address this, you can have ThreadStorm use an alternative implementation.
require "system_timer"
party = ThreadStorm.new :timeout_method => SystemTimer.method(:timeout) do
...
end
The :timeout_method
option takes any callable object (i.e. responds_to?(:call)
) that implements something similar to Timeout.timeout
(i.e. takes the same arguments and raises Timeout::Error
).
require "system_timer"
party = ThreadStorm.new :timeout_method => Proc.new{ |seconds, &block| SystemTimer.timeout(seconds, &block) }
...
end
Copyright
Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.