Class: InspecPlugins::Parallelism::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner

Returns a new instance of Runner.



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 10

def initialize(invocations, cli_options, sub_cmd = "exec")
  @invocations = invocations
  @sub_cmd = sub_cmd
  @total_jobs = cli_options["jobs"] || Concurrent.physical_processor_count
  @child_tracker = {}
  @child_tracker_persisted = {}
  @run_in_background = cli_options["bg"]
  unless run_in_background
    @ui = InspecPlugins::Parallelism::SuperReporter.make(cli_options["ui"], total_jobs, invocations)
  end
  @log_path = cli_options["log_path"]
end

Instance Attribute Details

#invocationsObject

Returns the value of attribute invocations.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def invocations
  @invocations
end

#log_pathObject

Returns the value of attribute log_path.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def log_path
  @log_path
end

#run_in_backgroundObject

Returns the value of attribute run_in_background.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def run_in_background
  @run_in_background
end

#sub_cmdObject

Returns the value of attribute sub_cmd.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def sub_cmd
  @sub_cmd
end

#total_jobsObject

Returns the value of attribute total_jobs.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def total_jobs
  @total_jobs
end

Instance Method Details

#cleanup_child_processesObject

Still in parent Loop over children and check for finished processes



168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 168

def cleanup_child_processes
  @child_tracker.each do |pid, info|
    if Process.wait(pid, Process::WNOHANG)
      # Expect to (probably) find EOF marker on the pipe, and close it if so
      update_ui_poll_select(pid)

      create_logs(pid, "#{Time.now.iso8601} Exit code: #{$?}\n")

      # child exited - status in $?
      @ui.child_exited(pid) unless run_in_background
      @child_tracker.delete pid
    end
  end
end

#cleanup_daemon_processObject



54
55
56
57
58
59
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 54

def cleanup_daemon_process
  current_process_id = Process.pid
  Process.kill(9, current_process_id)
  # DO NOT TRY TO REFACTOR IT THIS WAY
  # Calling Process.kill(9,Process.pid) kills the "stopper" process itself, rather than the one it's trying to stop.
end

#cleanup_empty_error_log_filesObject



61
62
63
64
65
66
67
68
69
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 61

def cleanup_empty_error_log_files
  logs_dir_path = log_path || Dir.pwd
  error_files = Dir.glob("#{logs_dir_path}/logs/*.err")
  error_files.each do |error_file|
    if File.exist?(error_file) && !File.size?(error_file)
      File.delete(error_file)
    end
  end
end

#fork_another_processObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 134

def fork_another_process
  invocation = invocations.shift[:value] # Be sure to do this shift() in parent process
  # thing_that_reads_from_the_child, thing_that_writes_to_the_parent = IO.pipe
  child_reader, parent_writer = IO.pipe
  if (child_pid = Process.fork)
    # In parent with newly forked child
    parent_writer.close
    @child_tracker[child_pid] = { io: child_reader }
    @ui.child_forked(child_pid, invocation) unless run_in_background
  else
    # In child
    child_reader.close
    # replace stdout with writer
    $stdout = parent_writer
    create_logs(Process.pid, nil, $stderr)

    begin
      create_logs(
        Process.pid,
        "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n"
      )
      runner_invocation(invocation)
    rescue StandardError => e
      $stderr.puts "#{Time.now.iso8601} Error Message: #{e.message}"
      $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}"
    end

    # should be unreachable but child MUST exit
    exit(42)
  end
end

#initiate_background_runObject



46
47
48
49
50
51
52
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 46

def initiate_background_run
  if Inspec.locally_windows?
    Inspec::UI.new.exit(:usage_error)
  else
    Process.daemon(true, true)
  end
end

#kill_child_processesObject



71
72
73
74
75
76
77
78
79
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 71

def kill_child_processes
  @child_tracker.each do |pid, info|
    Process.kill("SIGKILL", pid)
  rescue Exception => e
    $stderr.puts "Error while shutting down process #{pid}: #{e.message}"
  end
  # Waiting for child processes to die after they have been killed
  wait_for_child_processes_to_die
end

#rename_error_log_filesObject



95
96
97
98
99
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 95

def rename_error_log_files
  @child_tracker_persisted.each do |pid, info|
    rename_error_log(info[:error_log_file], pid)
  end
end

#runObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 23

def run
  initiate_background_run if run_in_background # running a process as daemon changes parent process pid
  until invocations.empty? && @child_tracker.empty?
    while should_start_more_jobs?
      if Inspec.locally_windows?
        spawn_another_process
      else
        fork_another_process
      end
    end

    update_ui_poll_select
    cleanup_child_processes
    sleep 0.1
  end

  # Requires renaming operations on windows only
  # Do Rename and delete operations after all child processes have exited successfully
  rename_error_log_files if Inspec.locally_windows?
  cleanup_empty_error_log_files
  cleanup_daemon_process if run_in_background
end

#should_start_more_jobs?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 101

def should_start_more_jobs?
  @child_tracker.length < total_jobs && !invocations.empty?
end

#spawn_another_processObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 105

def spawn_another_process
  invocation = invocations.shift[:value]

  child_reader, parent_writer = IO.pipe
  begin
    logs_dir_path = log_path || Dir.pwd
    log_dir = File.join(logs_dir_path, "logs")
    FileUtils.mkdir_p(log_dir)
    error_log_file = File.open("#{log_dir}/#{Time.now.nsec}.err", "a+")
    cmd = "#{$0} #{sub_cmd} #{invocation}"
    log_msg = "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n"
    child_pid = Process.spawn(cmd, out: parent_writer, err: error_log_file.path)

    # Logging
    create_logs(child_pid, log_msg)
    @child_tracker[child_pid] = { io: child_reader }

    # This is used to rename error log files after all child processes are exited
    @child_tracker_persisted[child_pid] = { error_log_file: error_log_file }
    @ui.child_spawned(child_pid, invocation)

    # Close the file to unlock the error log files opened by processes
    error_log_file.close
  rescue StandardError => e
    $stderr.puts "#{Time.now.iso8601} Error Message: #{e.message}"
    $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}"
  end
end

#update_ui_poll_select(target_pid = nil) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 183

def update_ui_poll_select(target_pid = nil)
  # Focus on one pid's pipe if specified, otherwise poll all pipes
  pipes_for_reading = target_pid ? [ @child_tracker[target_pid][:io] ] : @child_tracker.values.map { |i| i[:io] }
  # Next line is due to a race between the close() and the wait()... shouldn't need it, but it fixes the race.
  pipes_for_reading.reject!(&:closed?)
  ready_pipes = IO.select(pipes_for_reading, [], [], 0.1)
  return unless ready_pipes

  ready_pipes[0].each do |pipe_ready_for_reading|
    # If we weren't provided a PID, hackishly look up the pid from the matching IO.
    pid = target_pid || @child_tracker.keys.detect { |p| @child_tracker[p][:io] == pipe_ready_for_reading }
    begin
      while (update_line = pipe_ready_for_reading.readline) && !pipe_ready_for_reading.closed?
        if update_line =~ /EOF_MARKER/
          pipe_ready_for_reading.close
          break
        elsif update_line =~ /WARN/ || update_line =~ /ERROR/ || update_line =~ /INFO/
          create_logs(
            pid,
            "#{Time.now.iso8601} Extra log: #{update_line}\n"
          )
          break
        end
        update_ui_with_line(pid, update_line) unless run_in_background
        # Only pull one line if we are doing normal updates; slurp the whole file
        # if we are doing a final pull on a targeted PID
        break unless target_pid
      end
    rescue EOFError
      # On unix, readline throws an EOFError when we hit the end. On Windows, nothing apparently happens.
      pipe_ready_for_reading.close
      next
    end
  end
  # TODO: loop over ready_pipes[2] and handle errors?
end

#update_ui_with_line(pid, update_line) ⇒ Object



220
221
222
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 220

def update_ui_with_line(pid, update_line)
  @ui.child_status_update_line(pid, update_line)
end

#wait_for_child_processes_to_dieObject



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 81

def wait_for_child_processes_to_die
  until @child_tracker.empty?
    begin
      exited_pid = Process.waitpid(-1, Process::WNOHANG)
      @child_tracker.delete exited_pid if exited_pid && exited_pid > 0
      sleep 1
    rescue Errno::ECHILD
      Inspec::Log.info "Processes shutdown complete!"
    rescue Exception => e
      Inspec::Log.debug "Error while waiting for child processes to shutdown: #{e.message}"
    end
  end
end