thread_storm

Simple thread pool with a few advanced features.

Features

* execution state querying
* timeouts and configurable timeout implementation
* graceful error handling
* unit tests

Example

pool = ThreadStorm.new :size => 2
pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.01); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join # Should return in about 0.02 seconds... ;)
pool.values # ["a", "b", "c"]

Execution state

You can query the state of an execution.

pool = ThreadStorm.new :size => 2
execution = pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.01); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join
execution.started?   # true
execution.finished?  # true
execution.timed_out? # false
execution.duration   # ~0.01
execution.value      # "a"

Timeouts

You can restrict how long executions are allowed to run for.

pool = ThreadStorm.new :size => 2, :timeout => 0.02, :default_value => "failed"
pool.execute{ sleep(0.01); "a" }
pool.execute{ sleep(0.03); "b" }
pool.execute{ sleep(0.01); "c" }
pool.join
pool.executions[1].started?   # true
pool.executions[1].finished?  # true
pool.executions[1].timed_out? # true
pool.executions[1].duration   # ~0.02
pool.executions[1].value      # "failed"

Error handling

If an execution causes an exception, it will be reraised when ThreadStorm#join (or any other method that calls ThreadStorm#join) is called, unless you pass :reraise => false to ThreadStorm#new. The exception is stored in ThreadStorm::Execution#exception.

pool = ThreadStorm.new :size => 2, :reraise => false, :default_value => "failure"
execution = pool.execute{ raise("busted"); "a" }
pool.join
execution.value # "failure"
execution.exception # RuntimeError: busted

Joining vs shutting down

ThreadStorm#join blocks until all pending executions are done running. It does not actually kill the thread storm’s worker threads (incase you want to do more work). ThreadStorm#shutdown actually kills the worker threads.

Sometimes it can be a pain to remember to call #shutdown, so as a convenience, you can pass a block to ThreadStorm#new and #join and #shutdown will be called for you.

party = ThreadStorm.new do |p|
  p.execute{ "a" }
  p.execute{ "b" }
  p.execute{ "c" }
end
# At this point, #join and #shutdown have been called.
party.values # ["a", "b", "c"]

Configurable timeout method

Timeout.timeout is unreliable in MRI 1.8.x. To address this, you can have ThreadStorm use an alternative implementation.

require "system_timer"
party = ThreadStorm.new :timeout_method => SystemTimer.method(:timeout) do
  ...
end

The :timeout_method option takes any callable object (i.e. responds_to?(:call)) that implements something similar to Timeout.timeout (i.e. takes the same arguments and raises Timeout::Error).

require "system_timer"
party = ThreadStorm.new :timeout_method => Proc.new{ |seconds, &block| SystemTimer.timeout(seconds, &block) }
  ...
end

Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.