Class: OctocatalogDiff::Util::Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/octocatalog-diff/util/parallel.rb

Defined Under Namespace

Classes: IncompleteTask, Result, Task

Class Method Summary collapse

Class Method Details

.execute_task(task, logger) ⇒ OctocatalogDiff::Util::Parallel::Result

Utility method! Not intended to be called from outside this class.


Process a single task. Called by run_tasks_parallel / run_tasks_serial. This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object itself, and not raise them.

Parameters:

Returns:


194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/octocatalog-diff/util/parallel.rb', line 194

def self.execute_task(task, logger)
  begin
    logger.debug("Begin #{task.description}")
    output = task.execute(logger)
    result = Result.new(output: output, status: true, args: task.args)
  rescue => exc
    logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
    # Immediately return without running the validation, since this already failed.
    return Result.new(exception: exc, status: false, args: task.args)
  end

  begin
    if task.validate(output, logger)
      logger.debug("Success #{task.description}")
    else
      # Preferably the validator method raised its own exception. However if it
      # simply returned false, raise our own exception here.
      raise "Failed #{task.description} validation (unspecified error)"
    end
  rescue => exc
    logger.warn("Failed #{task.description} validation: #{exc.class} #{exc.message}")
    result.status = false
    result.exception = exc
  end

  result
end

.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false) ⇒ Array<Parallel::Result>

Entry point for parallel processing. By default this will perform parallel processing, but it will also accept an option to do serial processing instead.

Parameters:

  • task_array (Array<Parallel::Task>)

    Tasks to run

  • logger (Logger) (defaults to: nil)

    Optional logger object

  • parallelized (Boolean) (defaults to: true)

    True for parallel processing, false for serial processing

  • raise_exception (Boolean) (defaults to: false)

    True to raise exception immediately if one occurs; false to return exception in results

Returns:

Raises:

  • (ArgumentError)

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/octocatalog-diff/util/parallel.rb', line 72

def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false)
  # Create a throwaway logger object if one is not given
  logger ||= Logger.new(StringIO.new)

  # Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
  # return an empty array right away.
  raise ArgumentError, "run_tasks() argument must be array, not #{task_array.class}" unless task_array.is_a?(Array)
  return [] if task_array.empty?

  invalid_inputs = task_array.reject { |task| task.is_a?(OctocatalogDiff::Util::Parallel::Task) }
  if invalid_inputs.any?
    ele = invalid_inputs.first
    raise ArgumentError, "Element #{ele.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{ele.class}"
  end

  # Initialize the result array. For now all entries in the array indicate that the task was killed.
  # Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
  # the task was killed.
  result = task_array.map { |x| Result.new(exception: IncompleteTask.new('Killed'), args: x.args) }
  logger.debug "Initialized parallel task result array: size=#{result.size}"

  # Execute as per the requested method (serial or parallel) and handle results.
  exception = parallelized ? run_tasks_parallel(result, task_array, logger) : run_tasks_serial(result, task_array, logger)
  raise exception if exception && raise_exception
  result
end

.run_tasks_parallel(result, task_array, logger) ⇒ Exception

Utility method! Not intended to be called from outside this class.


Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result) into a serialized data file. Once children are forked this method waits for their return, deserializing the output from each data file and updating the `result` array with actual results.

Parameters:

Returns:

  • (Exception)

    First exception encountered by a child process; returns nil if no exceptions encountered.


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/octocatalog-diff/util/parallel.rb', line 109

def self.run_tasks_parallel(result, task_array, logger)
  pidmap = {}
  ipc_tempdir = OctocatalogDiff::Util::Util.temp_dir('ocd-ipc-')

  # Child process forking
  task_array.each_with_index do |task, index|
    # simplecov doesn't see this because it's forked
    # :nocov:
    this_pid = fork do
      ENV['OCTOCATALOG_DIFF_TEMPDIR'] ||= ipc_tempdir
      task_result = execute_task(task, logger)
      File.open(File.join(ipc_tempdir, "#{Process.pid}.dat"), 'w') { |f| f.write Marshal.dump(task_result) }
      Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
    end
    # :nocov:

    pidmap[this_pid] = { index: index, start_time: Time.now }
    logger.debug "Launched pid=#{this_pid} for index=#{index}"
    logger.reopen if logger.respond_to?(:reopen)
  end

  # Waiting for children and handling results
  while pidmap.any?
    pidmap.each do |pid|
      status = Process.waitpid2(pid[0], Process::WNOHANG)
      next if status.nil?
      this_pid, exit_obj = status
      next unless this_pid && pidmap.key?(this_pid)
      index = pidmap[this_pid][:index]
      exitstatus = exit_obj.exitstatus
      raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
      raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?

      input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
      result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
      time_delta = Time.now - pidmap[this_pid][:start_time]
      pidmap.delete(this_pid)

      logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"

      next if result[index].status
      return result[index].exception
    end
  end

  logger.debug 'All child processes completed with no exceptions raised'

# Cleanup: Kill any child processes that are still running, and clean the temporary directory
# where data files were stored.
ensure
  pidmap.each do |pid, _pid_data|
    begin
      Process.kill('TERM', pid)
    rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
      # If the process doesn't exist, that's fine.
    end
  end
end

.run_tasks_serial(result, task_array, logger) ⇒ Object

Utility method! Not intended to be called from outside this class.


Perform the tasks in serial.

Parameters:


174
175
176
177
178
179
180
181
182
183
184
# File 'lib/octocatalog-diff/util/parallel.rb', line 174

def self.run_tasks_serial(result, task_array, logger)
  # Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
  # whereas a failed task will replace the current element with an exception, and all later tasks
  # will not be replaced (thereby being populated with the cancellation error).
  task_array.each_with_index do |ele, task_counter|
    result[task_counter] = execute_task(ele, logger)
    next if result[task_counter].status
    return result[task_counter].exception
  end
  nil
end