Class: Chef::ChefFS::Parallelizer

Inherits:
Object
  • Object
show all
Defined in:
lib/chef/chef_fs/parallelizer.rb,
lib/chef/chef_fs/parallelizer/flatten_enumerable.rb,
lib/chef/chef_fs/parallelizer/parallel_enumerable.rb

Overview

Tries to balance several guarantees, in order of priority:

  • don’t get deadlocked

  • provide results in desired order

  • provide results as soon as they are available

  • process input as soon as possible

Defined Under Namespace

Classes: FlattenEnumerable, ParallelEnumerable

Constant Summary collapse

@@parallelizer =
nil
@@threads =
0

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_threads) ⇒ Parallelizer

Returns a new instance of Parallelizer.



32
33
34
35
36
37
# File 'lib/chef/chef_fs/parallelizer.rb', line 32

def initialize(num_threads)
  @tasks = Queue.new
  @threads = []
  @stop_thread = {}
  resize(num_threads)
end

Class Method Details

.parallel_do(enumerable, options = {}, &block) ⇒ Object



28
29
30
# File 'lib/chef/chef_fs/parallelizer.rb', line 28

def self.parallel_do(enumerable, options = {}, &block)
  parallelizer.parallel_do(enumerable, options, &block)
end

.parallelize(enumerable, options = {}, &block) ⇒ Object



24
25
26
# File 'lib/chef/chef_fs/parallelizer.rb', line 24

def self.parallelize(enumerable, options = {}, &block)
  parallelizer.parallelize(enumerable, options, &block)
end

.parallelizerObject



20
21
22
# File 'lib/chef/chef_fs/parallelizer.rb', line 20

def self.parallelizer
  @@parallelizer ||= Parallelizer.new(@@threads)
end

.threads=(value) ⇒ Object



15
16
17
18
# File 'lib/chef/chef_fs/parallelizer.rb', line 15

def self.threads=(value)
  @@threads = value
  @@parallelizer.resize(value) if @@parallelizer
end

Instance Method Details

#killObject



78
79
80
81
82
83
84
# File 'lib/chef/chef_fs/parallelizer.rb', line 78

def kill
  @threads.each do |thread|
    Thread.kill(thread)
    @stop_thread.delete(thread)
  end
  @threads = []
end

#num_threadsObject



39
40
41
# File 'lib/chef/chef_fs/parallelizer.rb', line 39

def num_threads
  @threads.size
end

#parallel_do(enumerable, options = {}, &block) ⇒ Object



47
48
49
# File 'lib/chef/chef_fs/parallelizer.rb', line 47

def parallel_do(enumerable, options = {}, &block)
  ParallelEnumerable.new(@tasks, enumerable, options.merge(ordered: false), &block).wait
end

#parallelize(enumerable, options = {}, &block) ⇒ Object



43
44
45
# File 'lib/chef/chef_fs/parallelizer.rb', line 43

def parallelize(enumerable, options = {}, &block)
  ParallelEnumerable.new(@tasks, enumerable, options, &block)
end

#resize(to_threads, wait = true, timeout = nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/chef/chef_fs/parallelizer.rb', line 55

def resize(to_threads, wait = true, timeout = nil)
  if to_threads < num_threads
    threads_to_stop = @threads[to_threads..num_threads - 1]
    @threads = @threads.slice(0, to_threads)
    threads_to_stop.each do |thread|
      @stop_thread[thread] = true
    end

    if wait
      start_time = Time.now
      threads_to_stop.each do |thread|
        thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
        thread.join(thread_timeout)
      end
    end

  else
    num_threads.upto(to_threads - 1) do |i|
      @threads[i] = Thread.new(&method(:worker_loop))
    end
  end
end

#stop(wait = true, timeout = nil) ⇒ Object



51
52
53
# File 'lib/chef/chef_fs/parallelizer.rb', line 51

def stop(wait = true, timeout = nil)
  resize(0, wait, timeout)
end