Class: Rubysh::Subprocess::PidAwareParallelIO

Inherits:
ParallelIO
  • Object
show all
Defined in:
lib/rubysh/subprocess/pid_aware_parallel_io.rb

Overview

We can’t actually rely on an EOF once our subprocess has died, since it may have forked and a child inherited the parent’s fds. (This happens, for example, when using SSH’s ControlPersist.)

E.g. try Rubysh.run(‘ruby’, ‘bad.rb’, Rubysh.>) with:

# cat bad.rb
fork {sleep 1000}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ParallelIO

#available_readers, #available_writers, #close, #consume_all_available, #on_read, #on_write, #read_available, #register_reader, #register_writer, #run, #run_select_loop, #write

Constructor Details

#initialize(readers, writers, subprocesses) ⇒ PidAwareParallelIO

readers/writers should be hashes mapping => name



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 64

def initialize(readers, writers, subprocesses)
  @breaker_reader, @breaker_writer = IO.pipe
  @subprocesses = subprocesses
  @finalized = false

  readers = readers.dup
  readers[@breaker_reader] = nil
  super(readers, writers)

  register_subprocesses
end

Instance Attribute Details

#finalizedObject (readonly)

Returns the value of attribute finalized.



61
62
63
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 61

def finalized
  @finalized
end

Class Method Details

.deregister_parallel_io(parallel_io) ⇒ Object



28
29
30
31
32
33
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 28

def self.deregister_parallel_io(parallel_io)
  @pids_mutex.synchronize do
    @parallel_ios.delete(parallel_io)
    deregister_sigchld_handler if @parallel_ios.length == 0
  end
end

.deregister_sigchld_handlerObject



57
58
59
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 57

def self.deregister_sigchld_handler
  Signal.trap('CHLD', @old_sigchld_handler)
end

.handle_sigchldObject



35
36
37
38
39
40
41
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 35

def self.handle_sigchld
  # It's ok for this operation to race against other
  # threads. Break loop on all currently active selectors. This
  # could in theory cause a thundering herd, but it's probably not
  # worth the work to defend against.
  @parallel_ios.values.each {|writer| trigger_breaker(writer)}
end

.register_parallel_io(parallel_io, breaker_writer) ⇒ Object



17
18
19
20
21
22
23
24
25
26
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 17

def self.register_parallel_io(parallel_io, breaker_writer)
  @pids_mutex.synchronize do
    register_sigchld_handler if @parallel_ios.length == 0
    @parallel_ios[parallel_io] = breaker_writer

    # This is needed in case the SIGCHLD is handled before the
    # writer is stored.
    trigger_breaker(breaker_writer)
  end
end

.register_sigchld_handlerObject



50
51
52
53
54
55
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 50

def self.register_sigchld_handler
  @old_sigchld_handler = Signal.trap('CHLD') {handle_sigchld}
  # MRI returns nil for a DEFAULT handler, but it also treats nil
  # as IGNORE.
  @old_sigchld_handler ||= 'DEFAULT'
end

.trigger_breaker(writer) ⇒ Object



43
44
45
46
47
48
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 43

def self.trigger_breaker(writer)
  begin
    writer.write_nonblock('a') unless writer.closed?
  rescue Errno::EAGAIN, Errno::EPIPE, IOError
  end
end

Instance Method Details

#finalize_allObject



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 96

def finalize_all
  @breaker_writer.close

  # We're guaranteed that if a process exited, all of its bytes
  # are immediately available to us.
  consume_all_available

  available_readers.each {|reader| reader.close}
  available_writers.each {|writer| writer.close}
  self.class.deregister_parallel_io(self)

  @finalized = true
end

#register_subprocessesObject



76
77
78
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 76

def register_subprocesses
  self.class.register_parallel_io(self, @breaker_writer)
end

#run_once(timeout = nil) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rubysh/subprocess/pid_aware_parallel_io.rb', line 80

def run_once(timeout=nil)
  return if @finalized

  @subprocesses.each do |subprocess|
    subprocess.wait(true)
  end

  # All subprocesses have exited! We're done here.
  if @subprocesses.all?(&:status)
    finalize_all
    return
  end

  super
end