Class: Parallelizer

Inherits:
Object
  • Object
show all
Defined in:
lib/parallelizer.rb

Defined Under Namespace

Classes: Computation, RejectedExecutionError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ops = {}) ⇒ Parallelizer

:core_pool_threads, :max_pool_threads, :keep_alive_time, :max_acceptable_delay, :delayed_too_long_proc, :prestart_all_core_threads



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/parallelizer.rb', line 26

def initialize ops={} #:core_pool_threads, :max_pool_threads, :keep_alive_time, :max_acceptable_delay, :delayed_too_long_proc, :prestart_all_core_threads
core_pool_threads = ops[:core_pool_threads] || 10
  max_pool_threads = ops[:max_pool_threads] || 10
  raise "Parallelizer core_pool_threads greater than max_pool_threads!" if core_pool_threads > max_pool_threads

  @pool = ThreadPoolExecutor.new(core_pool_threads,
                                 max_pool_threads,
                                 ops[:keep_alive_time] || 60,
                                 TimeUnit::SECONDS,
                                 LinkedBlockingQueue.new,
                                 DefaultDaemonThreadFactory.new)

  @max_acceptable_delay = ops[:max_acceptable_delay]
  @delayed_too_long_proc = ops[:delayed_too_long_proc]

  prestart_all_core_threads if ops[:prestart_all_core_threads]
end

Instance Attribute Details

#delayed_too_long_procObject

Returns the value of attribute delayed_too_long_proc.



24
25
26
# File 'lib/parallelizer.rb', line 24

def delayed_too_long_proc
  @delayed_too_long_proc
end

#max_acceptable_delayObject

Returns the value of attribute max_acceptable_delay.



24
25
26
# File 'lib/parallelizer.rb', line 24

def max_acceptable_delay
  @max_acceptable_delay
end

Instance Method Details

#await_termination(seconds) ⇒ Object



52
53
54
# File 'lib/parallelizer.rb', line 52

def await_termination(seconds)
  @pool.await_termination(seconds, TimeUnit::SECONDS)
end

#map(enumerator, ops = {}, &proc) ⇒ Object

works like a normal map, but in parallel, and also if an exception is raised that exception will be stored in that index instead of the result



58
59
60
# File 'lib/parallelizer.rb', line 58

def map enumerator, ops={}, &proc
  run_computation_array(enumerator.map {|arg| Computation.new(self, proc, arg) }, ops)
end

#prestart_all_core_threadsObject



44
45
46
# File 'lib/parallelizer.rb', line 44

def prestart_all_core_threads
  @pool.prestartAllCoreThreads
end

#run(array, ops = {}) ⇒ Object

expects an array of procs



63
64
65
# File 'lib/parallelizer.rb', line 63

def run array, ops={}
  run_computation_array(array.map {|proc| Computation.new(self, proc) }, ops)
end

#shutdownObject



48
49
50
# File 'lib/parallelizer.rb', line 48

def shutdown
  @pool.shutdown
end