Class: RightScale::RightPopen::Accumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/right_popen/linux/accumulator.rb

Constant Summary collapse

READ_CHUNK_SIZE =
4096

Instance Method Summary collapse

Constructor Details

#initialize(process, inputs, read_callbacks, outputs, write_callbacks) ⇒ Accumulator

Returns a new instance of Accumulator.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/right_popen/linux/accumulator.rb', line 29

def initialize(process, inputs, read_callbacks, outputs, write_callbacks)
  @process = process
  @inputs = inputs
  @outputs = outputs
  null = Proc.new {}
  @reads = {}
  @writes = {}
  inputs.zip(read_callbacks).each do |pair|
    input, callback = pair
    @reads[input] = callback
  end
  outputs.zip(write_callbacks).each do |pair|
    output, callback = pair
    @writes[output] = callback
  end
  @writebuffers = {}
  @status = nil
end

Instance Method Details

#cleanupObject



103
104
105
106
107
# File 'lib/right_popen/linux/accumulator.rb', line 103

def cleanup
  @inputs.each {|p| p.close unless p.closed? }
  @outputs.each {|p| p.close unless p.closed? }
  @process.status = ::Process.waitpid2(@process.pid) if @process.status.nil?
end

#number_waiting_onObject



99
100
101
# File 'lib/right_popen/linux/accumulator.rb', line 99

def number_waiting_on
  @inputs.size + @outputs.size
end

#run_to_completion(sleep_time = 0.1) ⇒ Object



109
110
111
112
113
114
# File 'lib/right_popen/linux/accumulator.rb', line 109

def run_to_completion(sleep_time=0.1)
  until tick(sleep_time)
    break if number_waiting_on == 0
  end
  cleanup
end

#tick(sleep_time = 0.1) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/right_popen/linux/accumulator.rb', line 48

def tick(sleep_time = 0.1)
  return true unless @process.status.nil?

  @process.status = status = ::Process.waitpid2(@process.pid, ::Process::WNOHANG)

  inputs = @inputs.dup
  outputs = @outputs.dup
  ready = nil
  while ready.nil?
    begin
      # in theory, we should note "exceptional conditions" and
      # permit procs for those, too.  In practice there are only
      # two times when exceptional conditions occur: out of band
      # data in TCP connections and "packet mode" for
      # pseudoterminals.  We care about neither of these,
      # therefore ignore exceptional conditions.
      ready = IO.select(inputs, outputs, nil, sleep_time)
    rescue Errno::EAGAIN, Errno::EINTR
    end
  end unless inputs.empty? && outputs.empty?

  ready[0].each do |fdes|
    if fdes.eof?
      fdes.close
      @inputs.delete(fdes)
    else
      chunk = fdes.readpartial(READ_CHUNK_SIZE)
      @reads[fdes].call(chunk) if @reads[fdes]
    end
  end unless ready.nil? || ready[0].nil?
  ready[1].each do |fdes|
    buffered = @writebuffers[fdes]
    buffered = @writes[fdes].call if @writes[fdes] if buffered.nil? || buffered.empty?
    if buffered.nil?
      fdes.close
      @outputs.delete(fdes)
    elsif !buffered.empty?
      begin
        amount = fdes.write_nonblock buffered
        @writebuffers[fdes] = buffered[amount..-1]
      rescue Errno::EPIPE
        # subprocess closed the pipe; fine.
        fdes.close
        @outputs.delete(fdes)
      end
    end
  end unless ready.nil? || ready[1].nil?

  return !@process.status.nil?
end