Class: ThreadExecutor::Executor
- Inherits:
-
Object
- Object
- ThreadExecutor::Executor
- Defined in:
- lib/thread_executor/executor.rb
Overview
A threaded executor.
Procs can be given to this executor and are executed according to the availability of threads.
Use
# Make an executor.
executor = ThreadExecutor::Executor.new 10
begin
# Dispatch 100 jobs across the 10 threads.
futures = 100.times.map { executor.call { do_long_running_work } }
# Collect the results.
results = futures.map { |future| future.value }
ensure
# Clean up the threads.
executor.finish
end
Instance Method Summary collapse
-
#call(&t) ⇒ Object
Enqueues the block in a processor queue with the fewest tasks.
-
#finish ⇒ Object
Shutdown and join all worker threads.
-
#initialize(size = 2) ⇒ Executor
constructor
Build a new executor with
sizeProcessor objects. -
#size ⇒ Object
Sum of all queue depths.
Constructor Details
#initialize(size = 2) ⇒ Executor
Build a new executor with size Processor objects. The default size is 2.
Each Processor contains a work queue and a running ruby Thread which will process the elements in the work queue.
This Executor will insert elements into the work queue.
Use of the Executor is not thread safe. If more than one thread submit work to Processor objects through this Executor, the Executor must be protected by a lock of some sort.
68 69 70 71 72 73 74 |
# File 'lib/thread_executor/executor.rb', line 68 def initialize size=2 @processors = [] size.times do @processors << Processor.new end end |
Instance Method Details
#call(&t) ⇒ Object
Enqueues the block in a processor queue with the fewest tasks.
Returns a future for the result.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/thread_executor/executor.rb', line 79 def call(&t) min_processor = @processors[0] min_size = min_processor.size @processors.each do |p| min_size2 = p.size if min_size > min_size2 min_processor = p min_size = min_size2 end end # Forward the user's block to the processor. min_processor.call &t end |
#finish ⇒ Object
Shutdown and join all worker threads.
100 101 102 103 104 |
# File 'lib/thread_executor/executor.rb', line 100 def finish @processors.each do |p| p.finish end end |
#size ⇒ Object
Sum of all queue depths.
95 96 97 |
# File 'lib/thread_executor/executor.rb', line 95 def size @processors.reduce(0) {|x,y| x + y.size} end |