Class: ThreadExecutor::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_executor/executor.rb

Overview

A threaded executor.

Procs can be given to this executor and are executed according to the availability of threads.

Use

# Make an executor.
executor = ThreadExecutor::Executor.new 10

begin

  # Dispatch 100 jobs across the 10 threads.
  futures = 100.times.map { executor.call { do_long_running_work } }

  # Collect the results.
  results = futures.map { |future| future.value }

ensure
  # Clean up the threads.
  executor.finish
end

Instance Method Summary collapse

Constructor Details

#initialize(size = 2) ⇒ Executor

Build a new executor with size Processor objects. The default size is 2.

Each Processor contains a work queue and a running ruby Thread which will process the elements in the work queue.

This Executor will insert elements into the work queue.

Use of the Executor is not thread safe. If more than one thread submit work to Processor objects through this Executor, the Executor must be protected by a lock of some sort.



68
69
70
71
72
73
74
# File 'lib/thread_executor/executor.rb', line 68

def initialize size=2
  @processors = []

  size.times do
    @processors << Processor.new
  end
end

Instance Method Details

#call(&t) ⇒ Object

Enqueues the block in a processor queue with the fewest tasks.

Returns a future for the result.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/thread_executor/executor.rb', line 79

def call(&t)
  min_processor = @processors[0]
  min_size = min_processor.size
  @processors.each do |p|
    min_size2 = p.size
    if min_size > min_size2
      min_processor = p
      min_size = min_size2
    end
  end

  # Forward the user's block to the processor.
  min_processor.call &t
end

#finishObject

Shutdown and join all worker threads.



100
101
102
103
104
# File 'lib/thread_executor/executor.rb', line 100

def finish
  @processors.each do |p|
    p.finish
  end
end

#sizeObject

Sum of all queue depths.



95
96
97
# File 'lib/thread_executor/executor.rb', line 95

def size
  @processors.reduce(0) {|x,y| x + y.size}
end