Class: RightScale::RightPopen::Accumulator
- Inherits:
-
Object
- Object
- RightScale::RightPopen::Accumulator
- Defined in:
- lib/right_popen/linux/accumulator.rb
Constant Summary collapse
- READ_CHUNK_SIZE =
4096
Instance Method Summary collapse
- #cleanup ⇒ Object
-
#initialize(process, inputs, read_callbacks, outputs, write_callbacks) ⇒ Accumulator
constructor
A new instance of Accumulator.
- #number_waiting_on ⇒ Object
- #run_to_completion(sleep_time = 0.1) ⇒ Object
- #tick(sleep_time = 0.1) ⇒ Object
Constructor Details
#initialize(process, inputs, read_callbacks, outputs, write_callbacks) ⇒ Accumulator
Returns a new instance of Accumulator.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/right_popen/linux/accumulator.rb', line 29 def initialize(process, inputs, read_callbacks, outputs, write_callbacks) @process = process @inputs = inputs @outputs = outputs null = Proc.new {} @reads = {} @writes = {} inputs.zip(read_callbacks).each do |pair| input, callback = pair @reads[input] = callback end outputs.zip(write_callbacks).each do |pair| output, callback = pair @writes[output] = callback end @writebuffers = {} @status = nil end |
Instance Method Details
#cleanup ⇒ Object
103 104 105 106 107 |
# File 'lib/right_popen/linux/accumulator.rb', line 103 def cleanup @inputs.each {|p| p.close unless p.closed? } @outputs.each {|p| p.close unless p.closed? } @process.status = ::Process.waitpid2(@process.pid) if @process.status.nil? end |
#number_waiting_on ⇒ Object
99 100 101 |
# File 'lib/right_popen/linux/accumulator.rb', line 99 def number_waiting_on @inputs.size + @outputs.size end |
#run_to_completion(sleep_time = 0.1) ⇒ Object
109 110 111 112 113 114 |
# File 'lib/right_popen/linux/accumulator.rb', line 109 def run_to_completion(sleep_time=0.1) until tick(sleep_time) break if number_waiting_on == 0 end cleanup end |
#tick(sleep_time = 0.1) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/right_popen/linux/accumulator.rb', line 48 def tick(sleep_time = 0.1) return true unless @process.status.nil? @process.status = status = ::Process.waitpid2(@process.pid, ::Process::WNOHANG) inputs = @inputs.dup outputs = @outputs.dup ready = nil while ready.nil? begin # in theory, we should note "exceptional conditions" and # permit procs for those, too. In practice there are only # two times when exceptional conditions occur: out of band # data in TCP connections and "packet mode" for # pseudoterminals. We care about neither of these, # therefore ignore exceptional conditions. ready = IO.select(inputs, outputs, nil, sleep_time) rescue Errno::EAGAIN, Errno::EINTR end end unless inputs.empty? && outputs.empty? ready[0].each do |fdes| if fdes.eof? fdes.close @inputs.delete(fdes) else chunk = fdes.readpartial(READ_CHUNK_SIZE) @reads[fdes].call(chunk) if @reads[fdes] end end unless ready.nil? || ready[0].nil? ready[1].each do |fdes| buffered = @writebuffers[fdes] buffered = @writes[fdes].call if @writes[fdes] if buffered.nil? || buffered.empty? if buffered.nil? fdes.close @outputs.delete(fdes) elsif !buffered.empty? begin amount = fdes.write_nonblock buffered @writebuffers[fdes] = buffered[amount..-1] rescue Errno::EPIPE # subprocess closed the pipe; fine. fdes.close @outputs.delete(fdes) end end end unless ready.nil? || ready[1].nil? return !@process.status.nil? end |