Class: ThreadStorm
- Defined in:
- lib/thread_storm.rb,
lib/thread_storm/worker.rb,
lib/thread_storm/sentinel.rb,
lib/thread_storm/execution.rb
Defined Under Namespace
Classes: Execution, Sentinel, Worker
Instance Attribute Summary collapse
-
#executions ⇒ Object
readonly
Array of executions in order as they are defined by calls to ThreadStorm#execute.
Instance Method Summary collapse
-
#busy_workers ⇒ Object
Returns workers that are currently running executions.
-
#clear_executions(method_name = nil, &block) ⇒ Object
Removes executions stored at ThreadStorm#executions.
-
#execute(*args, &block) ⇒ Object
Creates an execution and schedules it to be run by the thread pool.
-
#initialize(options = {}) ⇒ ThreadStorm
constructor
Valid options are :size => How many threads to spawn.
-
#join ⇒ Object
Block until all pending executions are finished running.
-
#shutdown ⇒ Object
Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.
-
#size ⇒ Object
:nodoc:.
-
#threads ⇒ Object
Returns an array of Ruby threads in the pool.
-
#values ⇒ Object
Calls ThreadStorm#join, then collects the values of each execution.
Constructor Details
#initialize(options = {}) ⇒ ThreadStorm
Valid options are
:size => How many threads to spawn. Default is 2.
:timeout => Max time an execution is allowed to run before terminating it. Default is nil (no timeout).
:timeout_method => An object that implements something like Timeout.timeout via #call. Default is Timeout.method(:timeout).
:default_value => Value of an execution if it times out or errors. Default is nil.
:reraise => True if you want exceptions reraised when ThreadStorm#join is called. Default is true.
:execute_blocks => True if you want #execute to block until there is an available thread. Default is false.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/thread_storm.rb', line 20 def initialize( = {}) @options = .option_merge :size => 2, :timeout => nil, :timeout_method => Timeout.method(:timeout), :default_value => nil, :reraise => true, :execute_blocks => false @sentinel = Sentinel.new @queue = [] @executions = [] @workers = (1..@options[:size]).collect{ Worker.new(@queue, @sentinel, @options) } if block_given? yield(self) join shutdown end end |
Instance Attribute Details
#executions ⇒ Object (readonly)
Array of executions in order as they are defined by calls to ThreadStorm#execute.
11 12 13 |
# File 'lib/thread_storm.rb', line 11 def executions @executions end |
Instance Method Details
#busy_workers ⇒ Object
Returns workers that are currently running executions.
81 82 83 |
# File 'lib/thread_storm.rb', line 81 def busy_workers @workers.select{ |worker| worker.busy? } end |
#clear_executions(method_name = nil, &block) ⇒ Object
Removes executions stored at ThreadStorm#executions. You can selectively remove them by passing in a block or a symbol. The following two lines are equivalent.
storm.clear_executions(:finished?)
storm.clear_executions{ |e| e.finished? }
Because of the nature of threading, the following code could happen:
storm.clear_executions(:finished?)
storm.executions.any?{ |e| e.finished? }
Some executions could have finished between the two calls.
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/thread_storm.rb', line 98 def clear_executions(method_name = nil, &block) cleared, @executions = @executions.separate do |execution| if block_given? yield(execution) else execution.send(method_name) end end cleared end |
#execute(*args, &block) ⇒ Object
Creates an execution and schedules it to be run by the thread pool. Return value is a ThreadStorm::Execution.
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/thread_storm.rb', line 44 def execute(*args, &block) Execution.new(args, default_value, &block).tap do |execution| @sentinel.synchronize do |e_cond, p_cond| e_cond.wait_while{ all_workers_busy? } if execute_blocks? @queue << execution @executions << execution p_cond.signal end end end |
#join ⇒ Object
Block until all pending executions are finished running. Reraises any exceptions caused by executions unless :reraise => false was passed to ThreadStorm#new.
57 58 59 60 61 62 |
# File 'lib/thread_storm.rb', line 57 def join @executions.each do |execution| execution.join raise execution.exception if execution.exception and reraise? end end |
#shutdown ⇒ Object
Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.
71 72 73 74 75 76 77 78 |
# File 'lib/thread_storm.rb', line 71 def shutdown @sentinel.synchronize do |e_cond, p_cond| @queue.replace([:die] * size) p_cond.broadcast end @workers.each{ |worker| worker.thread.join } true end |
#size ⇒ Object
:nodoc:
38 39 40 |
# File 'lib/thread_storm.rb', line 38 def size #:nodoc: @options[:size] end |
#threads ⇒ Object
Returns an array of Ruby threads in the pool.
86 87 88 |
# File 'lib/thread_storm.rb', line 86 def threads @workers.collect{ |worker| worker.thread } end |
#values ⇒ Object
Calls ThreadStorm#join, then collects the values of each execution.
65 66 67 |
# File 'lib/thread_storm.rb', line 65 def values join and @executions.collect{ |execution| execution.value } end |