Class: InParallel::InParallelExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/in_parallel.rb

Defined Under Namespace

Classes: BlankBindingParallelProxy

Constant Summary collapse

@@parallel_signal_interval =

How many seconds between outputting to stdout that we are waiting for child processes. 0 or < 0 means no signaling.

30
@@parallel_default_timeout =
1800
@@process_infos =
[]
@@background_objs =
[]
@@result_id =
0
@@pids =
[]
@@main_pid =
Process.pid

Class Method Summary collapse

Class Method Details

._execute_in_parallel(method_sym, obj = self, &block) ⇒ Object

private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/in_parallel.rb', line 184

def self._execute_in_parallel(method_sym, obj = self, &block)
  ret_val                   = nil
  # Communicate the return value of the method or block
  read_result, write_result = IO.pipe
  Dir.mkdir('tmp') unless Dir.exists? 'tmp'
  pid                       = fork do
    stdout_file = File.new("tmp/pp_#{Process.pid}", 'w')
    exit_status = 0
    trap(:INT) do
      # Can't use logger inside of trap
      puts "Warning: Interrupt received in child process; exiting #{Process.pid}"
      kill_child_processes
      return
    end

    # IO buffer is 64kb, which isn't much... if debug logging is turned on,
    # this can be exceeded before a process completes.
    # Storing output in file rather than using IO.pipe
    STDOUT.reopen(stdout_file)
    STDERR.reopen(stdout_file)

    begin
      # close subprocess's copy of read_result since it only needs to write
      read_result.close
      ret_val = obj.instance_eval(&block)
      ret_val = strip_singleton(ret_val)
      # In case there are other types that can't be dumped
      begin
        # Write the result to the write_result IO stream.
        Marshal.dump(ret_val, write_result) unless ret_val.nil?
      rescue StandardError => err
        @@logger.warn "Warning: return value from child process #{ret_val} " +
                          "could not be transferred to parent process: #{err.message}"
      end
    rescue Exception => err
      @@logger.error "Error in process #{Process.pid}: #{err.message}"
      # Return the error if an error is rescued so we can re-throw in the main process.
      Marshal.dump(err, write_result)
      exit_status = 1
    ensure
      write_result.close
      exit exit_status
    end
  end

  @@logger.info "Forked process for #{method_sym} - PID = '#{pid}'"
  write_result.close
  # Process.detach returns a thread that will be nil if the process is still running and thr if not.
  # This allows us to check to see if processes have exited without having to call the blocking Process.wait functions.
  wait_thread  = Process.detach(pid)
  # store the IO object with the STDOUT and waiting thread for each pid
  process_info = { :wait_thread => wait_thread,
                   :pid         => pid,
                   :method_sym  => method_sym,
                   :std_out     => "tmp/pp_#{pid}",
                   :result      => read_result,
                   :tmp_result  => "unresolved_parallel_result_#{@@result_id}",
                   :result_buffer => StringIO.new,
                   :index       => @@process_infos.count }
  @@process_infos.push(process_info)
  @@result_id += 1
  process_info
end

.fork_supported?Boolean

Returns:

  • (Boolean)


248
249
250
251
252
# File 'lib/in_parallel.rb', line 248

def self.fork_supported?
  @@supported ||= Process.respond_to?(:fork)
  @@logger.warn 'Warning: Fork is not supported on this OS, executing block normally' unless @@supported
  @@supported
end

.loggerObject



39
40
41
# File 'lib/in_parallel.rb', line 39

def self.logger
  @@logger
end

.logger=(value) ⇒ Object



43
44
45
# File 'lib/in_parallel.rb', line 43

def self.logger=(value)
  @@logger = value
end

.main_pidObject



27
28
29
# File 'lib/in_parallel.rb', line 27

def self.main_pid
  @@main_pid
end

.parallel_default_timeoutObject



31
32
33
# File 'lib/in_parallel.rb', line 31

def self.parallel_default_timeout
  @@parallel_default_timeout
end

.parallel_default_timeout=(value) ⇒ Object



35
36
37
# File 'lib/in_parallel.rb', line 35

def self.parallel_default_timeout=(value)
  @@parallel_default_timeout = value
end

.process_infosObject



16
17
18
# File 'lib/in_parallel.rb', line 16

def self.process_infos
  @@process_infos
end

.run_in_background(ignore_result = true, &block) ⇒ Object

Runs all methods within the block in parallel in the background

Example - Will spawn a process in the background to run puppet agent on two agents and return immediately:

Parallel.run_in_background do
  @result_1 = method1
  @result_2 = method2
end
# Do something else here before waiting for the process to complete

# Optionally wait for the processes to complete before continuing.
# Otherwise use run_in_background(true) to clean up the process status and output immediately.
wait_for_processes(self)

NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result_1 will evaluate to “unresolved_parallel_result_0”



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/in_parallel.rb', line 79

def self.run_in_background(ignore_result = true, &block)
  if fork_supported?
    proxy = BlankBindingParallelProxy.new(block.binding)
    proxy.instance_eval(&block)

    if ignore_result
      Process.detach(@@process_infos.last[:pid])
      @@process_infos.pop
    else
      @@background_objs << { :proxy => proxy, :target => block.binding }
      return process_infos.last[:tmp_result]
    end
    return
  end
  # if fork is not supported
  result = block.call
  return nil if ignore_result
  result
end

.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) ⇒ Object

Runs all methods within the block in parallel and waits for them to complete

Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT:

InParallel.run_in_parallel do
  @result_1 = method1
  @result_2 = method2
end

NOTE: Only supports assigning instance variables within the block, not local variables



55
56
57
58
59
60
61
62
63
# File 'lib/in_parallel.rb', line 55

def self.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block)
  if fork_supported?
    proxy = BlankBindingParallelProxy.new(block.binding)
    proxy.instance_eval(&block)
    return wait_for_processes(proxy, block.binding, timeout, kill_all_on_error)
  end
  # if fork is not supported
  block.call
end

.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) ⇒ Object

Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class

Parameters:

  • proxy (Object) (defaults to: self)
    • The instance of the proxy class that the method was executed within (probably only useful when called by run_in_background)

  • binding (Object) (defaults to: nil)
    • The binding of the block to assign return values to instance variables (probably only useful when called by run_in_background)

  • timeout (Int) (defaults to: nil)

    Time in seconds to wait before giving up on a child process

  • kill_all_on_error (Boolean) (defaults to: false)

    Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/in_parallel.rb', line 104

def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false)
  raise_error = nil
  timeout     ||= @@parallel_default_timeout
  send_int = false
  trap(:INT) do
    # Can't use logger inside of trap
    puts "Warning, recieved interrupt.  Processing child results and exiting."
    send_int = true
    kill_child_processes
  end
  return unless Process.respond_to?(:fork)
  # Custom process to wait so that we can do things like time out, and kill child processes if
  # one process returns with an error before the others complete.
  results_map = Array.new(@@process_infos.count)
  start_time  = Time.now
  timer       = start_time
  while !@@process_infos.empty? do
    if @@parallel_signal_interval > 0 && Time.now > timer + @@parallel_signal_interval
      @@logger.debug 'Waiting for child processes.'
      timer = Time.now
    end
    if Time.now > start_time + timeout
      kill_child_processes
      raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}")
    end

    if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5)
      read_ios = result.first
      read_ios.each do |reader|
        process_info = @@process_infos.find {|p| p[:result] == reader}
        process_info[:result_buffer] << reader.read
        if reader.eof?
          result = process_info[:result_buffer].string
          # the process completed, get the result and rethrow on error.
          begin
            # Print the STDOUT and STDERR for each process with signals for start and end
            @@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}"
            # Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc)
            # So don't use logger, just use puts.
            puts "  " + File.new(process_info[:std_out], 'r').readlines.join("  ")
            @@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}"
            marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result)
            # Kill all other processes and let them log their stdout before re-raising
            # if a child process raised an error.
            if marshalled_result.is_a?(Exception)
              raise_error = marshalled_result.dup
              kill_child_processes if kill_all_on_error
                marshalled_result = nil
            end
            results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result }
          ensure
            File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out])
            # close the read end pipe
            process_info[:result].close unless process_info[:result].closed?
            @@process_infos.delete(process_info)
          end
        end
      end
    end
  end

  results = []

  # pass in the 'self' from the block.binding which is the instance of the class
  # that contains the initial binding call.
  # This gives us access to the instance variables from that context.
  results = result_lookup(proxy, binding, results_map) if binding

  # If there are background_objs AND results, don't return the background obj results
  # (which would mess up expected results from each_in_parallel),
  # but do process their results in case they are assigned to instance variables
  @@background_objs.each { |obj| result_lookup(obj[:proxy], obj[:target], results_map) }
  @@background_objs.clear

  Process.kill("INT", Process.pid) if send_int
  raise raise_error unless raise_error.nil?
  return results
end