Class: Chef::ChefFS::Parallelizer::ParallelEnumerable

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/chef/chef_fs/parallelizer/parallel_enumerable.rb

Defined Under Namespace

Classes: RestrictedLazy

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent_task_queue, input_enumerable, options = {}, &block) ⇒ ParallelEnumerable

options: :ordered [true|false] - whether the output should stay in the same order

as the input (even though it may not actually be processed in that
order). Default: true

:stop_on_exception [true|false] - if true, when an exception occurs in either

input or output, we wait for any outstanding processing to complete,
but will not process any new inputs. Default: false

:main_thread_processing [true|false] - whether the main thread pulling

on each() is allowed to process inputs. Default: true
NOTE: If you set this to false, parallelizer.kill will stop each()
in its tracks, so you need to know for sure that won't happen.


20
21
22
23
24
25
26
27
28
29
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 20

def initialize(parent_task_queue, input_enumerable, options = {}, &block)
  @parent_task_queue = parent_task_queue
  @input_enumerable = input_enumerable
  @options = options
  @block = block

  @unconsumed_input = Queue.new
  @in_process = {}
  @unconsumed_output = Queue.new
end

Instance Attribute Details

#blockObject (readonly)

Returns the value of attribute block.



34
35
36
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 34

def block
  @block
end

#input_enumerableObject (readonly)

Returns the value of attribute input_enumerable.



32
33
34
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 32

def input_enumerable
  @input_enumerable
end

#optionsObject (readonly)

Returns the value of attribute options.



33
34
35
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 33

def options
  @options
end

#parent_task_queueObject (readonly)

Returns the value of attribute parent_task_queue.



31
32
33
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 31

def parent_task_queue
  @parent_task_queue
end

Instance Method Details

#count(*args, &block) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 87

def count(*args, &block)
  if args.size == 0 && block.nil?
    @input_enumerable.count
  else
    original_count(*args, &block)
  end
end

#drop(n) ⇒ Object



103
104
105
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 103

def drop(n)
  restricted_copy(@input_enumerable.drop(n)).to_a
end

#eachObject



36
37
38
39
40
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 36

def each
  each_with_input do |output, index, input, type|
    yield output
  end
end

#each_with_exceptions(&block) ⇒ Object



64
65
66
67
68
69
70
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 64

def each_with_exceptions(&block)
  if @options[:ordered] == false
    each_with_exceptions_unordered(&block)
  else
    each_with_exceptions_ordered(&block)
  end
end

#each_with_indexObject



42
43
44
45
46
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 42

def each_with_index
  each_with_input do |output, index, input|
    yield output, index
  end
end

#each_with_inputObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 48

def each_with_input
  exception = nil
  each_with_exceptions do |output, index, input, type|
    if type == :exception
      if @options[:ordered] == false
        exception ||= output
      else
        raise output
      end
    else
      yield output, index, input
    end
  end
  raise exception if exception
end

#first(n = nil) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 95

def first(n = nil)
  if n
    restricted_copy(@input_enumerable.first(n)).to_a
  else
    first(1)[0]
  end
end

#flatten(levels = nil) ⇒ Object



107
108
109
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 107

def flatten(levels = nil)
  FlattenEnumerable.new(self, levels)
end

#lazyObject



139
140
141
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 139

def lazy
  RestrictedLazy.new(self, original_lazy)
end

#original_countObject



85
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 85

alias :original_count :count

#original_lazyObject



137
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 137

alias :original_lazy :lazy

#restricted_copy(enumerable) ⇒ Object

Enumerable methods



81
82
83
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 81

def restricted_copy(enumerable)
  ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
end

#take(n) ⇒ Object



111
112
113
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 111

def take(n)
  restricted_copy(@input_enumerable.take(n)).to_a
end

#waitObject



72
73
74
75
76
77
78
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 72

def wait
  exception = nil
  each_with_exceptions_unordered do |output, index, input, type|
    exception ||= output if type == :exception
  end
  raise exception if exception
end