Class: InParallel::InParallelExecutor
- Inherits:
-
Object
- Object
- InParallel::InParallelExecutor
- Defined in:
- lib/in_parallel.rb
Defined Under Namespace
Classes: BlankBindingParallelProxy
Constant Summary collapse
- @@parallel_signal_interval =
How many seconds between outputting to stdout that we are waiting for child processes. 0 or < 0 means no signaling.
30
- @@parallel_default_timeout =
1800
- @@process_infos =
[]
- @@background_objs =
[]
- @@result_id =
0
- @@pids =
[]
- @@main_pid =
Process.pid
Class Method Summary collapse
-
._execute_in_parallel(method_sym, obj = self, &block) ⇒ Object
private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval.
- .fork_supported? ⇒ Boolean
- .logger ⇒ Object
- .logger=(value) ⇒ Object
- .main_pid ⇒ Object
- .parallel_default_timeout ⇒ Object
- .parallel_default_timeout=(value) ⇒ Object
- .process_infos ⇒ Object
-
.run_in_background(ignore_result = true, &block) ⇒ Object
Runs all methods within the block in parallel in the background.
-
.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) ⇒ Object
Runs all methods within the block in parallel and waits for them to complete.
-
.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) ⇒ Object
Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class.
Class Method Details
._execute_in_parallel(method_sym, obj = self, &block) ⇒ Object
private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/in_parallel.rb', line 184 def self._execute_in_parallel(method_sym, obj = self, &block) ret_val = nil # Communicate the return value of the method or block read_result, write_result = IO.pipe Dir.mkdir('tmp') unless Dir.exists? 'tmp' pid = fork do stdout_file = File.new("tmp/pp_#{Process.pid}", 'w') exit_status = 0 trap(:INT) do # Can't use logger inside of trap puts "Warning: Interrupt received in child process; exiting #{Process.pid}" kill_child_processes return end # IO buffer is 64kb, which isn't much... if debug logging is turned on, # this can be exceeded before a process completes. # Storing output in file rather than using IO.pipe STDOUT.reopen(stdout_file) STDERR.reopen(stdout_file) begin # close subprocess's copy of read_result since it only needs to write read_result.close ret_val = obj.instance_eval(&block) ret_val = strip_singleton(ret_val) # In case there are other types that can't be dumped begin # Write the result to the write_result IO stream. Marshal.dump(ret_val, write_result) unless ret_val.nil? rescue StandardError => err @@logger.warn "Warning: return value from child process #{ret_val} " + "could not be transferred to parent process: #{err.}" end rescue Exception => err @@logger.error "Error in process #{Process.pid}: #{err.}" # Return the error if an error is rescued so we can re-throw in the main process. Marshal.dump(err, write_result) exit_status = 1 ensure write_result.close exit exit_status end end @@logger.info "Forked process for #{method_sym} - PID = '#{pid}'" write_result.close # Process.detach returns a thread that will be nil if the process is still running and thr if not. # This allows us to check to see if processes have exited without having to call the blocking Process.wait functions. wait_thread = Process.detach(pid) # store the IO object with the STDOUT and waiting thread for each pid process_info = { :wait_thread => wait_thread, :pid => pid, :method_sym => method_sym, :std_out => "tmp/pp_#{pid}", :result => read_result, :tmp_result => "unresolved_parallel_result_#{@@result_id}", :result_buffer => StringIO.new, :index => @@process_infos.count } @@process_infos.push(process_info) @@result_id += 1 process_info end |
.fork_supported? ⇒ Boolean
248 249 250 251 252 |
# File 'lib/in_parallel.rb', line 248 def self.fork_supported? @@supported ||= Process.respond_to?(:fork) @@logger.warn 'Warning: Fork is not supported on this OS, executing block normally' unless @@supported @@supported end |
.logger ⇒ Object
39 40 41 |
# File 'lib/in_parallel.rb', line 39 def self.logger @@logger end |
.logger=(value) ⇒ Object
43 44 45 |
# File 'lib/in_parallel.rb', line 43 def self.logger=(value) @@logger = value end |
.main_pid ⇒ Object
27 28 29 |
# File 'lib/in_parallel.rb', line 27 def self.main_pid @@main_pid end |
.parallel_default_timeout ⇒ Object
31 32 33 |
# File 'lib/in_parallel.rb', line 31 def self.parallel_default_timeout @@parallel_default_timeout end |
.parallel_default_timeout=(value) ⇒ Object
35 36 37 |
# File 'lib/in_parallel.rb', line 35 def self.parallel_default_timeout=(value) @@parallel_default_timeout = value end |
.process_infos ⇒ Object
16 17 18 |
# File 'lib/in_parallel.rb', line 16 def self.process_infos @@process_infos end |
.run_in_background(ignore_result = true, &block) ⇒ Object
Runs all methods within the block in parallel in the background
Example - Will spawn a process in the background to run puppet agent on two agents and return immediately:
Parallel.run_in_background do
@result_1 = method1
@result_2 = method2
end
# Do something else here before waiting for the process to complete
# Optionally wait for the processes to complete before continuing.
# Otherwise use run_in_background(true) to clean up the process status and output immediately.
wait_for_processes(self)
NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result_1 will evaluate to “unresolved_parallel_result_0”
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/in_parallel.rb', line 79 def self.run_in_background(ignore_result = true, &block) if fork_supported? proxy = BlankBindingParallelProxy.new(block.binding) proxy.instance_eval(&block) if ignore_result Process.detach(@@process_infos.last[:pid]) @@process_infos.pop else @@background_objs << { :proxy => proxy, :target => block.binding } return process_infos.last[:tmp_result] end return end # if fork is not supported result = block.call return nil if ignore_result result end |
.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) ⇒ Object
Runs all methods within the block in parallel and waits for them to complete
Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT:
InParallel.run_in_parallel do
@result_1 = method1
@result_2 = method2
end
NOTE: Only supports assigning instance variables within the block, not local variables
55 56 57 58 59 60 61 62 63 |
# File 'lib/in_parallel.rb', line 55 def self.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block) if fork_supported? proxy = BlankBindingParallelProxy.new(block.binding) proxy.instance_eval(&block) return wait_for_processes(proxy, block.binding, timeout, kill_all_on_error) end # if fork is not supported block.call end |
.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) ⇒ Object
Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/in_parallel.rb', line 104 def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false) raise_error = nil timeout ||= @@parallel_default_timeout send_int = false trap(:INT) do # Can't use logger inside of trap puts "Warning, recieved interrupt. Processing child results and exiting." send_int = true kill_child_processes end return unless Process.respond_to?(:fork) # Custom process to wait so that we can do things like time out, and kill child processes if # one process returns with an error before the others complete. results_map = Array.new(@@process_infos.count) start_time = Time.now timer = start_time while !@@process_infos.empty? do if @@parallel_signal_interval > 0 && Time.now > timer + @@parallel_signal_interval @@logger.debug 'Waiting for child processes.' timer = Time.now end if Time.now > start_time + timeout kill_child_processes raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}") end if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5) read_ios = result.first read_ios.each do |reader| process_info = @@process_infos.find {|p| p[:result] == reader} process_info[:result_buffer] << reader.read if reader.eof? result = process_info[:result_buffer].string # the process completed, get the result and rethrow on error. begin # Print the STDOUT and STDERR for each process with signals for start and end @@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}" # Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc) # So don't use logger, just use puts. puts " " + File.new(process_info[:std_out], 'r').readlines.join(" ") @@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}" marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result) # Kill all other processes and let them log their stdout before re-raising # if a child process raised an error. if marshalled_result.is_a?(Exception) raise_error = marshalled_result.dup kill_child_processes if kill_all_on_error marshalled_result = nil end results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result } ensure File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out]) # close the read end pipe process_info[:result].close unless process_info[:result].closed? @@process_infos.delete(process_info) end end end end end results = [] # pass in the 'self' from the block.binding which is the instance of the class # that contains the initial binding call. # This gives us access to the instance variables from that context. results = result_lookup(proxy, binding, results_map) if binding # If there are background_objs AND results, don't return the background obj results # (which would mess up expected results from each_in_parallel), # but do process their results in case they are assigned to instance variables @@background_objs.each { |obj| result_lookup(obj[:proxy], obj[:target], results_map) } @@background_objs.clear Process.kill("INT", Process.pid) if send_int raise raise_error unless raise_error.nil? return results end |