Class: InspecPlugins::Parallelism::Runner
- Inherits:
-
Object
- Object
- InspecPlugins::Parallelism::Runner
- Defined in:
- lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb
Instance Attribute Summary collapse
-
#invocations ⇒ Object
Returns the value of attribute invocations.
-
#log_path ⇒ Object
Returns the value of attribute log_path.
-
#run_in_background ⇒ Object
Returns the value of attribute run_in_background.
-
#sub_cmd ⇒ Object
Returns the value of attribute sub_cmd.
-
#total_jobs ⇒ Object
Returns the value of attribute total_jobs.
Instance Method Summary collapse
-
#cleanup_child_processes ⇒ Object
Still in parent Loop over children and check for finished processes.
- #cleanup_daemon_process ⇒ Object
- #cleanup_empty_error_log_files ⇒ Object
- #fork_another_process ⇒ Object
-
#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner
constructor
A new instance of Runner.
- #initiate_background_run ⇒ Object
- #kill_child_processes ⇒ Object
- #rename_error_log_files ⇒ Object
- #run ⇒ Object
- #should_start_more_jobs? ⇒ Boolean
- #spawn_another_process ⇒ Object
- #update_ui_poll_select(target_pid = nil) ⇒ Object
- #update_ui_with_line(pid, update_line) ⇒ Object
- #wait_for_child_processes_to_die ⇒ Object
Constructor Details
#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner
Returns a new instance of Runner.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 10 def initialize(invocations, , sub_cmd = "exec") @invocations = invocations @sub_cmd = sub_cmd @total_jobs = ["jobs"] || Concurrent.physical_processor_count @child_tracker = {} @child_tracker_persisted = {} @run_in_background = ["bg"] unless run_in_background @ui = InspecPlugins::Parallelism::SuperReporter.make(["ui"], total_jobs, invocations) end @log_path = ["log_path"] end |
Instance Attribute Details
#invocations ⇒ Object
Returns the value of attribute invocations.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def invocations @invocations end |
#log_path ⇒ Object
Returns the value of attribute log_path.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def log_path @log_path end |
#run_in_background ⇒ Object
Returns the value of attribute run_in_background.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def run_in_background @run_in_background end |
#sub_cmd ⇒ Object
Returns the value of attribute sub_cmd.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def sub_cmd @sub_cmd end |
#total_jobs ⇒ Object
Returns the value of attribute total_jobs.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def total_jobs @total_jobs end |
Instance Method Details
#cleanup_child_processes ⇒ Object
Still in parent Loop over children and check for finished processes
168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 168 def cleanup_child_processes @child_tracker.each do |pid, info| if Process.wait(pid, Process::WNOHANG) # Expect to (probably) find EOF marker on the pipe, and close it if so update_ui_poll_select(pid) create_logs(pid, "#{Time.now.iso8601} Exit code: #{$?}\n") # child exited - status in $? @ui.child_exited(pid) unless run_in_background @child_tracker.delete pid end end end |
#cleanup_daemon_process ⇒ Object
54 55 56 57 58 59 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 54 def cleanup_daemon_process current_process_id = Process.pid Process.kill(9, current_process_id) # DO NOT TRY TO REFACTOR IT THIS WAY # Calling Process.kill(9,Process.pid) kills the "stopper" process itself, rather than the one it's trying to stop. end |
#cleanup_empty_error_log_files ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 61 def cleanup_empty_error_log_files logs_dir_path = log_path || Dir.pwd error_files = Dir.glob("#{logs_dir_path}/logs/*.err") error_files.each do |error_file| if File.exist?(error_file) && !File.size?(error_file) File.delete(error_file) end end end |
#fork_another_process ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 134 def fork_another_process invocation = invocations.shift[:value] # Be sure to do this shift() in parent process # thing_that_reads_from_the_child, thing_that_writes_to_the_parent = IO.pipe child_reader, parent_writer = IO.pipe if (child_pid = Process.fork) # In parent with newly forked child parent_writer.close @child_tracker[child_pid] = { io: child_reader } @ui.child_forked(child_pid, invocation) unless run_in_background else # In child child_reader.close # replace stdout with writer $stdout = parent_writer create_logs(Process.pid, nil, $stderr) begin create_logs( Process.pid, "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n" ) runner_invocation(invocation) rescue StandardError => e $stderr.puts "#{Time.now.iso8601} Error Message: #{e.}" $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}" end # should be unreachable but child MUST exit exit(42) end end |
#initiate_background_run ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 46 def initiate_background_run if Inspec.locally_windows? Inspec::UI.new.exit(:usage_error) else Process.daemon(true, true) end end |
#kill_child_processes ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 71 def kill_child_processes @child_tracker.each do |pid, info| Process.kill("SIGKILL", pid) rescue Exception => e $stderr.puts "Error while shutting down process #{pid}: #{e.}" end # Waiting for child processes to die after they have been killed wait_for_child_processes_to_die end |
#rename_error_log_files ⇒ Object
95 96 97 98 99 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 95 def rename_error_log_files @child_tracker_persisted.each do |pid, info| rename_error_log(info[:error_log_file], pid) end end |
#run ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 23 def run initiate_background_run if run_in_background # running a process as daemon changes parent process pid until invocations.empty? && @child_tracker.empty? while should_start_more_jobs? if Inspec.locally_windows? spawn_another_process else fork_another_process end end update_ui_poll_select cleanup_child_processes sleep 0.1 end # Requires renaming operations on windows only # Do Rename and delete operations after all child processes have exited successfully rename_error_log_files if Inspec.locally_windows? cleanup_empty_error_log_files cleanup_daemon_process if run_in_background end |
#should_start_more_jobs? ⇒ Boolean
101 102 103 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 101 def should_start_more_jobs? @child_tracker.length < total_jobs && !invocations.empty? end |
#spawn_another_process ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 105 def spawn_another_process invocation = invocations.shift[:value] child_reader, parent_writer = IO.pipe begin logs_dir_path = log_path || Dir.pwd log_dir = File.join(logs_dir_path, "logs") FileUtils.mkdir_p(log_dir) error_log_file = File.open("#{log_dir}/#{Time.now.nsec}.err", "a+") cmd = "#{$0} #{sub_cmd} #{invocation}" log_msg = "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n" child_pid = Process.spawn(cmd, out: parent_writer, err: error_log_file.path) # Logging create_logs(child_pid, log_msg) @child_tracker[child_pid] = { io: child_reader } # This is used to rename error log files after all child processes are exited @child_tracker_persisted[child_pid] = { error_log_file: error_log_file } @ui.child_spawned(child_pid, invocation) # Close the file to unlock the error log files opened by processes error_log_file.close rescue StandardError => e $stderr.puts "#{Time.now.iso8601} Error Message: #{e.}" $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}" end end |
#update_ui_poll_select(target_pid = nil) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 183 def update_ui_poll_select(target_pid = nil) # Focus on one pid's pipe if specified, otherwise poll all pipes pipes_for_reading = target_pid ? [ @child_tracker[target_pid][:io] ] : @child_tracker.values.map { |i| i[:io] } # Next line is due to a race between the close() and the wait()... shouldn't need it, but it fixes the race. pipes_for_reading.reject!(&:closed?) ready_pipes = IO.select(pipes_for_reading, [], [], 0.1) return unless ready_pipes ready_pipes[0].each do |pipe_ready_for_reading| # If we weren't provided a PID, hackishly look up the pid from the matching IO. pid = target_pid || @child_tracker.keys.detect { |p| @child_tracker[p][:io] == pipe_ready_for_reading } begin while (update_line = pipe_ready_for_reading.readline) && !pipe_ready_for_reading.closed? if update_line =~ /EOF_MARKER/ pipe_ready_for_reading.close break elsif update_line =~ /WARN/ || update_line =~ /ERROR/ || update_line =~ /INFO/ create_logs( pid, "#{Time.now.iso8601} Extra log: #{update_line}\n" ) break end update_ui_with_line(pid, update_line) unless run_in_background # Only pull one line if we are doing normal updates; slurp the whole file # if we are doing a final pull on a targeted PID break unless target_pid end rescue EOFError # On unix, readline throws an EOFError when we hit the end. On Windows, nothing apparently happens. pipe_ready_for_reading.close next end end # TODO: loop over ready_pipes[2] and handle errors? end |
#update_ui_with_line(pid, update_line) ⇒ Object
220 221 222 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 220 def update_ui_with_line(pid, update_line) @ui.child_status_update_line(pid, update_line) end |
#wait_for_child_processes_to_die ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 81 def wait_for_child_processes_to_die until @child_tracker.empty? begin exited_pid = Process.waitpid(-1, Process::WNOHANG) @child_tracker.delete exited_pid if exited_pid && exited_pid > 0 sleep 1 rescue Errno::ECHILD Inspec::Log.info "Processes shutdown complete!" rescue Exception => e Inspec::Log.debug "Error while waiting for child processes to shutdown: #{e.}" end end end |