thread_storm

Simple thread pool with a few advanced features.

Features

Some notable features.

* execution state querying
* timeouts and configurable timeout implementation
* graceful error handling
* unit tests

Example

A simple example to get you started.

storm = ThreadStorm.new :size => 2
storm.execute{ sleep(0.01); "a" }
storm.execute{ sleep(0.01); "b" }
storm.execute{ sleep(0.01); "c" }
storm.join # Should return in about 0.02 seconds... ;)
storm.values # ["a", "b", "c"]

Execution state

You can query the state of an execution.

storm = ThreadStorm.new :size => 2
execution = storm.execute{ sleep(0.01); "a" }
execution.join
execution.finished? # true

An execution can be in one of 4 states at any given time: initialized, queued, started, finished

Initialized means the execution has been created, but not yet scheduled to be run by the thread pool (i.e. ThreadStorm#execute hasn’t been called on it yet).

Queued means the execution has been scheduled to run, but there are no free threads available to run it yet.

Started means that it is currently running on a thread.

Finished means it has completed running.

Execution status

You can query the status of an execution.

storm = ThreadStorm.new :size => 2
execution = storm.execute{ sleep(0.01); "a" }
execution.join
execution.success? # true
execution.failure? # false
execution.timeout? # false

An execution can have one of three statuses after it has entered the finished state: success, failure, timeout

Success means it finished without raising an exception.

Failure means it raised an exception.

Timeout means it ran longer than the timeout limit and was aborted.

Timeouts

You can restrict how long executions are allowed to run for.

storm = ThreadStorm.new :size => 2, :timeout => 0.02
execution = storm.execute{ sleep(0.03); "b" }
execution.join
execution.finished?  # true
execution.timeout?   # true
executions.duration   # ~0.02

Error handling

If an execution causes an exception, it will be reraised when ThreadStorm#join (or any other method that calls ThreadStorm#join) is called, unless you pass :reraise => false to ThreadStorm#new. The exception is stored in ThreadStorm::Execution#exception.

storm = ThreadStorm.new :size => 2, :reraise => false, :default_value => "failure"
execution = storm.execute{ raise("busted"); "a" }
execution.join
execution.failure?  # true
execution.value     # "failure"
execution.exception # RuntimeError: busted

Joining vs shutting down

ThreadStorm#join blocks until all pending executions are done running. It does not actually kill the thread storm’s worker threads (incase you want to do more work). ThreadStorm#shutdown actually kills the worker threads.

Sometimes it can be a pain to remember to call #shutdown, so as a convenience, you can pass a block to ThreadStorm#new and #join and #shutdown will be called for you.

storm = ThreadStorm.new do |s|
  s.execute{ "a" }
  s.execute{ "b" }
  s.execute{ "c" }
end
# At this point, #join and #shutdown have been called.
storm.values # ["a", "b", "c"]

Configurable timeout method

Timeout.timeout is unreliable in MRI 1.8. To address this, you can have ThreadStorm use an alternative implementation.

require "system_timer"
storm = ThreadStorm.new :timeout_method => SystemTimer.method(:timeout) do
  ...
end

The :timeout_method option takes any callable object that has the same signature as Timeout.timeout.

require "system_timer"
storm = ThreadStorm.new :timeout_method => Proc.new{ |seconds, &block| SystemTimer.timeout(seconds, &block) }
  ...
end

Caveats and Gotchas

This is tricky…

ThreadStorm.new do |s|
  s.execute{ raise RuntimeError }
  begin
    s.join
  rescue RuntimeError => e
    puts "execution failed"
  end
end

This will still raise an exception because ThreadStorm#join will be called again after the block is finished. This same problem happens with ThreadStorm#run.

Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.