Class: ActiveSupport::Testing::Parallelization::Server

Inherits:
Object
  • Object
show all
Includes:
DRb::DRbUndumped
Defined in:
lib/active_support/testing/parallelization/server.rb

Instance Method Summary collapse

Constructor Details

#initializeServer

Returns a new instance of Server.



14
15
16
17
18
19
# File 'lib/active_support/testing/parallelization/server.rb', line 14

def initialize
  @queue = Queue.new
  @active_workers = Concurrent::Map.new
  @worker_pids = Concurrent::Map.new
  @in_flight = Concurrent::Map.new
end

Instance Method Details

#<<(o) ⇒ Object



32
33
34
35
# File 'lib/active_support/testing/parallelization/server.rb', line 32

def <<(o)
  o[2] = DRbObject.new(o[2]) if o
  @queue << o
end

#active_workers?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/active_support/testing/parallelization/server.rb', line 64

def active_workers?
  @active_workers.size > 0
end

#interruptObject



68
69
70
# File 'lib/active_support/testing/parallelization/server.rb', line 68

def interrupt
  @queue.clear
end

#popObject



37
38
39
40
41
42
# File 'lib/active_support/testing/parallelization/server.rb', line 37

def pop
  if test = @queue.pop
    @in_flight[[test[0].to_s, test[1]]] = test
    test
  end
end

#record(reporter, result) ⇒ Object

Raises:

  • (DRb::DRbConnError)


21
22
23
24
25
26
27
28
29
30
# File 'lib/active_support/testing/parallelization/server.rb', line 21

def record(reporter, result)
  raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)

  @in_flight.delete([result.klass, result.name])

  reporter.synchronize do
    reporter.prerecord(PrerecordResultClass.new(result.klass), result.name)
    reporter.record(result)
  end
end

#remove_dead_workers(dead_pids) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/active_support/testing/parallelization/server.rb', line 54

def remove_dead_workers(dead_pids)
  dead_pids.each do |dead_pid|
    worker_id = @worker_pids.key(dead_pid)
    if worker_id
      @active_workers.delete(worker_id)
      @worker_pids.delete(worker_id)
    end
  end
end

#shutdownObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/active_support/testing/parallelization/server.rb', line 72

def shutdown
  # Wait for initial queue to drain
  while @queue.length != 0
    sleep 0.1
  end

  @queue.close

  # Wait until all workers have finished
  while active_workers?
    sleep 0.1
  end

  @in_flight.values.each do |(klass, name, reporter)|
    result = Minitest::Result.from(klass.new(name))
    error = RuntimeError.new("result not reported")
    error.set_backtrace([""])
    result.failures << Minitest::UnexpectedError.new(error)
    reporter.synchronize do
      reporter.record(result)
    end
  end
end

#start_worker(worker_id, worker_pid) ⇒ Object



44
45
46
47
# File 'lib/active_support/testing/parallelization/server.rb', line 44

def start_worker(worker_id, worker_pid)
  @active_workers[worker_id] = true
  @worker_pids[worker_id] = worker_pid
end

#stop_worker(worker_id, worker_pid) ⇒ Object



49
50
51
52
# File 'lib/active_support/testing/parallelization/server.rb', line 49

def stop_worker(worker_id, worker_pid)
  @active_workers.delete(worker_id)
  @worker_pids.delete(worker_id)
end