Class: Rubysh::Subprocess::PidAwareParallelIO
- Inherits:
-
ParallelIO
- Object
- ParallelIO
- Rubysh::Subprocess::PidAwareParallelIO
show all
- Defined in:
- lib/rubysh/subprocess/pid_aware_parallel_io.rb
Overview
We can’t actually rely on an EOF once our subprocess has died, since it may have forked and a child inherited the parent’s fds. (This happens, for example, when using SSH’s ControlPersist.)
E.g. try Rubysh.run(‘ruby’, ‘bad.rb’, Rubysh.>) with:
fork {sleep 1000}
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from ParallelIO
#available_readers, #available_writers, #close, #consume_all_available, #on_read, #on_write, #read_available, #register_reader, #register_writer, #run, #run_select_loop, #write
Constructor Details
#initialize(readers, writers, subprocesses) ⇒ PidAwareParallelIO
readers/writers should be hashes mapping => name
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 64
def initialize(readers, writers, subprocesses)
@breaker_reader, @breaker_writer = IO.pipe
@subprocesses = subprocesses
@finalized = false
readers = readers.dup
readers[@breaker_reader] = nil
super(readers, writers)
register_subprocesses
end
|
Instance Attribute Details
#finalized ⇒ Object
Returns the value of attribute finalized.
61
62
63
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 61
def finalized
@finalized
end
|
Class Method Details
.deregister_parallel_io(parallel_io) ⇒ Object
28
29
30
31
32
33
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 28
def self.deregister_parallel_io(parallel_io)
@pids_mutex.synchronize do
@parallel_ios.delete(parallel_io)
deregister_sigchld_handler if @parallel_ios.length == 0
end
end
|
.deregister_sigchld_handler ⇒ Object
57
58
59
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 57
def self.deregister_sigchld_handler
Signal.trap('CHLD', @old_sigchld_handler)
end
|
.handle_sigchld ⇒ Object
35
36
37
38
39
40
41
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 35
def self.handle_sigchld
@parallel_ios.values.each {|writer| trigger_breaker(writer)}
end
|
.register_parallel_io(parallel_io, breaker_writer) ⇒ Object
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 17
def self.register_parallel_io(parallel_io, breaker_writer)
@pids_mutex.synchronize do
register_sigchld_handler if @parallel_ios.length == 0
@parallel_ios[parallel_io] = breaker_writer
trigger_breaker(breaker_writer)
end
end
|
.register_sigchld_handler ⇒ Object
50
51
52
53
54
55
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 50
def self.register_sigchld_handler
@old_sigchld_handler = Signal.trap('CHLD') {handle_sigchld}
@old_sigchld_handler ||= 'DEFAULT'
end
|
.trigger_breaker(writer) ⇒ Object
43
44
45
46
47
48
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 43
def self.trigger_breaker(writer)
begin
writer.write_nonblock('a') unless writer.closed?
rescue Errno::EAGAIN, Errno::EPIPE, IOError
end
end
|
Instance Method Details
#finalize_all ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 96
def finalize_all
@breaker_writer.close
consume_all_available
available_readers.each {|reader| reader.close}
available_writers.each {|writer| writer.close}
self.class.deregister_parallel_io(self)
@finalized = true
end
|
#register_subprocesses ⇒ Object
76
77
78
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 76
def register_subprocesses
self.class.register_parallel_io(self, @breaker_writer)
end
|
#run_once(timeout = nil) ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 80
def run_once(timeout=nil)
return if @finalized
@subprocesses.each do |subprocess|
subprocess.wait(true)
end
if @subprocesses.all?(&:status)
finalize_all
return
end
super
end
|