Class: Ruptr::Runner::Forking

Inherits:
Parallel show all
Defined in:
lib/ruptr/runner.rb

Defined Under Namespace

Classes: Master, Worker

Instance Attribute Summary

Attributes inherited from Parallel

#parallel_jobs

Attributes inherited from Ruptr::Runner

#capture_output, #golden_store, #timing_store

Instance Method Summary collapse

Methods inherited from Parallel

default_parallel_jobs, #initialize

Methods inherited from Ruptr::Runner

class_from_env, #expected_processor_time, find_runner, from_env, #initialize, opts_from_env, #run_report, #run_sink

Constructor Details

This class inherits a constructor from Ruptr::Runner::Parallel

Instance Method Details

#dispatch(ts, sink) ⇒ Object



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/ruptr/runner.rb', line 393

def dispatch(ts, sink)
  master = Master.new(sink, [], [])
  # NOTE: Each iteration of #yield_batches is run with a global state setup for this
  # particular batch of test cases. Worker processes must not be reused if they were forked
  # while a different state was active.
  total_workers_limit = parallel_jobs
  each_batch(ts, sink) do |batch|
    all_tcs = batch.test_cases
    batch_workers_limit = workers_count_for_test_cases(all_tcs)
    unless batch_workers_limit > 1
      # Fallback to single-threaded serial execution.
      dispatch_batch(batch, sink)
      next
    end
    batch_workers_count = 0
    remaining_tc_indexes = all_tcs.size.times.to_a
    until remaining_tc_indexes.empty?
      while master.all_workers.size < total_workers_limit &&
            batch_workers_count < batch_workers_limit
        spawn_worker(master, batch, all_tcs)
        batch_workers_count += 1
      end
      distribute_work(master, all_tcs, remaining_tc_indexes)
      # See if we can spawn workers for the next batch before waiting on results.
      break if remaining_tc_indexes.empty?
      process_responses(master)
      drop_workers(master, master.free_workers.select(&:stale))
    end
    master.all_workers.each { |w| w.stale = true }
    if batch.fork_overlap
      drop_workers(master, master.free_workers)
    else
      finish_workers(master)
    end
  end
  finish_workers(master)
ensure
  master.all_workers.each(&:close)
  master.all_workers = master.free_workers = nil
end