Class: ThreadStorm

Inherits:
Object show all
Defined in:
lib/thread_storm.rb,
lib/thread_storm/worker.rb,
lib/thread_storm/sentinel.rb,
lib/thread_storm/execution.rb

Defined Under Namespace

Classes: Execution, Sentinel, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ThreadStorm

Valid options are

:size => How many threads to spawn.  Default is 2.
:timeout => Max time an execution is allowed to run before terminating it.  Default is nil (no timeout).
:timeout_method => An object that implements something like Timeout.timeout via #call.  Default is Timeout.method(:timeout).
:default_value => Value of an execution if it times out or errors.  Default is nil.
:reraise => True if you want exceptions reraised when ThreadStorm#join is called.  Default is true.
:execute_blocks => True if you want #execute to block until there is an available thread.  Default is false.


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/thread_storm.rb', line 20

def initialize(options = {})
  @options = options.option_merge :size => 2,
                                  :timeout => nil,
                                  :timeout_method => Timeout.method(:timeout),
                                  :default_value => nil,
                                  :reraise => true,
                                  :execute_blocks => false
  @sentinel = Sentinel.new
  @queue = []
  @executions = []
  @workers = (1..@options[:size]).collect{ Worker.new(@queue, @sentinel, @options) }
  if block_given?
    yield(self)
    join
    shutdown
  end
end

Instance Attribute Details

#executionsObject (readonly)

Array of executions in order as they are defined by calls to ThreadStorm#execute.



11
12
13
# File 'lib/thread_storm.rb', line 11

def executions
  @executions
end

Instance Method Details

#busy_workersObject

Returns workers that are currently running executions.



81
82
83
# File 'lib/thread_storm.rb', line 81

def busy_workers
  @workers.select{ |worker| worker.busy? }
end

#clear_executions(method_name = nil, &block) ⇒ Object

Removes executions stored at ThreadStorm#executions. You can selectively remove them by passing in a block or a symbol. The following two lines are equivalent.

storm.clear_executions(:finished?)
storm.clear_executions{ |e| e.finished? }

Because of the nature of threading, the following code could happen:

storm.clear_executions(:finished?)
storm.executions.any?{ |e| e.finished? }

Some executions could have finished between the two calls.



98
99
100
101
102
103
104
105
106
107
# File 'lib/thread_storm.rb', line 98

def clear_executions(method_name = nil, &block)
  cleared, @executions = @executions.separate do |execution|
    if block_given?
      yield(execution)
    else
      execution.send(method_name)
    end
  end
  cleared
end

#execute(*args, &block) ⇒ Object

Creates an execution and schedules it to be run by the thread pool. Return value is a ThreadStorm::Execution.



44
45
46
47
48
49
50
51
52
53
# File 'lib/thread_storm.rb', line 44

def execute(*args, &block)
  Execution.new(args, default_value, &block).tap do |execution|
    @sentinel.synchronize do |e_cond, p_cond|
      e_cond.wait_while{ all_workers_busy? } if execute_blocks?
      @queue << execution
      @executions << execution
      p_cond.signal
    end
  end
end

#joinObject

Block until all pending executions are finished running. Reraises any exceptions caused by executions unless :reraise => false was passed to ThreadStorm#new.



57
58
59
60
61
62
# File 'lib/thread_storm.rb', line 57

def join
  @executions.each do |execution|
    execution.join
    raise execution.exception if execution.exception and reraise?
  end
end

#shutdownObject

Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.



71
72
73
74
75
76
77
78
# File 'lib/thread_storm.rb', line 71

def shutdown
  @sentinel.synchronize do |e_cond, p_cond|
    @queue.replace([:die] * size)
    p_cond.broadcast
  end
  @workers.each{ |worker| worker.thread.join }
  true
end

#sizeObject

:nodoc:



38
39
40
# File 'lib/thread_storm.rb', line 38

def size #:nodoc:
  @options[:size]
end

#threadsObject

Returns an array of Ruby threads in the pool.



86
87
88
# File 'lib/thread_storm.rb', line 86

def threads
  @workers.collect{ |worker| worker.thread }
end

#valuesObject

Calls ThreadStorm#join, then collects the values of each execution.



65
66
67
# File 'lib/thread_storm.rb', line 65

def values
  join and @executions.collect{ |execution| execution.value }
end