thread_storm
Simple thread pool with a few advanced features.
Features
* execution state
* timeouts and configurable timeout implementation
* graceful error handling
* unit tests
Example
storm = ThreadStorm.new :size => 2
storm.execute{ sleep(0.01); "a" }
storm.execute{ sleep(0.01); "b" }
storm.execute{ sleep(0.01); "c" }
storm.join # Should return in about 0.02 seconds... ;)
storm.values # ["a", "b", "c"]
Execution state
You can query the state of an execution.
storm = ThreadStorm.new :size => 2
execution = storm.execute{ sleep(0.01); "a" }
storm.execute{ sleep(0.01); "b" }
storm.execute{ sleep(0.01); "c" }
storm.join
execution.started? # true
execution.finished? # true
execution.timed_out? # false
execution.duration # ~0.01
execution.value # "a"
Timeouts
You can restrict how long executions are allowed to run for.
storm = ThreadStorm.new :size => 2, :timeout => 0.02, :default_value => "failed"
storm.execute{ sleep(0.01); "a" }
storm.execute{ sleep(0.03); "b" }
storm.execute{ sleep(0.01); "c" }
storm.join
storm.executions[1].started? # true
storm.executions[1].finished? # true
storm.executions[1].timed_out? # true
storm.executions[1].duration # ~0.02
storm.executions[1].value # "failed"
Error handling
If an execution causes an exception, it will be reraised when ThreadStorm#join (or any other method that calls ThreadStorm#join) is called, unless you pass :reraise => false to ThreadStorm#new. The exception is stored in ThreadStorm::Execution#exception.
storm = ThreadStorm.new :size => 2, :reraise => false, :default_value => "failure"
execution = storm.execute{ raise("busted"); "a" }
storm.join
execution.value # "failure"
execution.exception # RuntimeError: busted
Joining vs shutting down
ThreadStorm#join blocks until all pending executions are done running. It does not actually kill the thread storm’s worker threads (incase you want to do more work). ThreadStorm#shutdown actually kills the worker threads.
Sometimes it can be a pain to remember to call #shutdown, so as a convenience, you can pass a block to ThreadStorm#new and #join and #shutdown will be called for you.
storm = ThreadStorm.new do |s|
s.execute{ "a" }
s.execute{ "b" }
s.execute{ "c" }
end
# At this point, #join and #shutdown have been called.
storm.values # ["a", "b", "c"]
Configurable timeout method
Timeout.timeout is unreliable in MRI 1.8.x. To address this, you can have ThreadStorm use an alternative implementation.
require "system_timer"
storm = ThreadStorm.new :timeout_method => SystemTimer.method(:timeout) do
...
end
The :timeout_method option takes any callable object (i.e. responds_to?(:call)) that implements something similar to Timeout.timeout (i.e. takes the same arguments and raises Timeout::Error).
require "system_timer"
storm = ThreadStorm.new :timeout_method => Proc.new{ |seconds, &block| SystemTimer.timeout(seconds, &block) }
...
end
Copyright
Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.