Class: MultipleCmd

Inherits:
Object
  • Object
show all
Defined in:
lib/mcmd.rb

Defined Under Namespace

Classes: SubProc

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMultipleCmd

Returns a new instance of MultipleCmd.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/mcmd.rb', line 13

def initialize
  # these are re-initialized after every run
  @subproc_by_pid = Hash.new
  @subproc_by_fd = Hash.new
  @processed_commands = []
  # end items which are re-initialized

  self.commands = []
  self.perchild_timeout = 60
  self.global_timeout = 0
  self.maxflight = 200
  self.debug = false
  self.poll_period = 0.5 # shouldn't need adjusting
  self.max_read_size = 2 ** 19 # 512k
end

Instance Attribute Details

#commandsObject

Returns the value of attribute commands.



9
10
11
# File 'lib/mcmd.rb', line 9

def commands
  @commands
end

#debugObject

Returns the value of attribute debug.



10
11
12
# File 'lib/mcmd.rb', line 10

def debug
  @debug
end

#global_timeoutObject

Returns the value of attribute global_timeout.



9
10
11
# File 'lib/mcmd.rb', line 9

def global_timeout
  @global_timeout
end

#max_read_sizeObject

Returns the value of attribute max_read_size.



11
12
13
# File 'lib/mcmd.rb', line 11

def max_read_size
  @max_read_size
end

#maxflightObject

Returns the value of attribute maxflight.



9
10
11
# File 'lib/mcmd.rb', line 9

def maxflight
  @maxflight
end

#perchild_timeoutObject

Returns the value of attribute perchild_timeout.



9
10
11
# File 'lib/mcmd.rb', line 9

def perchild_timeout
  @perchild_timeout
end

#poll_periodObject

Returns the value of attribute poll_period.



11
12
13
# File 'lib/mcmd.rb', line 11

def poll_period
  @poll_period
end

#verboseObject

Returns the value of attribute verbose.



11
12
13
# File 'lib/mcmd.rb', line 11

def verbose
  @verbose
end

#yield_proc_timeoutObject

Returns the value of attribute yield_proc_timeout.



10
11
12
# File 'lib/mcmd.rb', line 10

def yield_proc_timeout
  @yield_proc_timeout
end

#yield_startcmdObject

Returns the value of attribute yield_startcmd.



10
11
12
# File 'lib/mcmd.rb', line 10

def yield_startcmd
  @yield_startcmd
end

#yield_waitObject

Returns the value of attribute yield_wait.



10
11
12
# File 'lib/mcmd.rb', line 10

def yield_wait
  @yield_wait
end

Instance Method Details

#add_subprocess(cmd) ⇒ Object

I should probably move this whole method into SubProc and make the subproc_by_* into class variables



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/mcmd.rb', line 40

def add_subprocess(cmd)
  stdin_rd, stdin_wr = IO.pipe
  stdout_rd, stdout_wr = IO.pipe
  stderr_rd, stderr_wr = IO.pipe
  subproc = MultipleCmd::SubProc.new
  subproc.stdin_fd = stdin_wr
  subproc.stdout_fd = stdout_rd
  subproc.stderr_fd = stderr_rd
  subproc.command = cmd
  
  pid = fork
  if not pid.nil?
    # parent
    # for mapping to subproc by pid
    subproc.pid = pid
    @subproc_by_pid[pid] = subproc
    # for mapping to subproc by i/o handle (returned from select)
    @subproc_by_fd[stdin_rd] = subproc
    @subproc_by_fd[stdin_wr] = subproc
    @subproc_by_fd[stdout_rd] = subproc
    @subproc_by_fd[stdout_wr] = subproc
    @subproc_by_fd[stderr_rd] = subproc
    @subproc_by_fd[stderr_wr] = subproc
    
    self.yield_startcmd.call(subproc) unless self.yield_startcmd.nil?
  else
    # child
    # setup stdin, out, err
    STDIN.reopen(stdin_rd)
    STDOUT.reopen(stdout_wr)
    STDERR.reopen(stderr_wr)
    noshell_exec(cmd)
    raise "can't be reached!!. exec failed!!"
  end
end

#kill_process(p) ⇒ Object



156
157
158
159
160
161
162
163
164
# File 'lib/mcmd.rb', line 156

def kill_process(p)
  # do not remove from pid list until waited on
  @subproc_by_fd.delete(p.stdin_fd)
  @subproc_by_fd.delete(p.stdout_fd)
  @subproc_by_fd.delete(p.stderr_fd)
  # must kill after deleting from maps
  # kill closes fds
  p.kill
end

#noshell_exec(cmd) ⇒ Object



29
30
31
32
33
34
35
# File 'lib/mcmd.rb', line 29

def noshell_exec(cmd)
  if cmd.length == 1
    Kernel.exec([cmd[0], cmd[0]])
  else 
    Kernel.exec([cmd[0], cmd[0]], *cmd[1..-1])
  end
end

#process_err_fds(err_fds) ⇒ Object



128
129
# File 'lib/mcmd.rb', line 128

def process_err_fds(err_fds)
end

#process_read_fds(read_fds) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/mcmd.rb', line 76

def process_read_fds(read_fds)
  read_fds.each do |fd|
    # read available bytes, add to the subproc's read buf
    if not @subproc_by_fd.has_key?(fd)
      raise "Select returned a fd which I have not seen! fd: #{fd.inspect}"
    end
    subproc = @subproc_by_fd[fd]
    buf = ""
    begin
      buf = fd.sysread(4096)
      
      if buf.nil?
        raise " Impossible result from sysread()"
      end
      # no exception? bytes were read. append them.
      if fd == subproc.stdout_fd
        subproc.stdout_buf << buf
        # FIXME if we've read > maxbuf, allow closing/ignoring the fd instead of hard kill
        if subproc.stdout_buf.bytesize > self.max_read_size
          # self.kill_process(subproc) # can't kill this here, need a way to mark-to-kill
        end
      elsif fd == subproc.stderr_fd
        subproc.stderr_buf << buf
        # FIXME if we've read > maxbuf, allow closing/ignoring the fd instead of hard kill
        if subproc.stderr_buf.bytesize > self.max_read_size
          # self.kill_process(subproc) # "" above
        end
      end
    rescue SystemCallError, EOFError => ex
      puts "DEBUG: saw read exception #{ex}" if self.debug
      # clear out the read fd for this subproc
      # finalize read i/o
      # if we're reading, it was the process's stdout or stderr
      if fd == subproc.stdout_fd
        subproc.stdout_fd = nil 
      elsif fd == subproc.stderr_fd
        subproc.stderr_fd = nil
      else
        raise "impossible: operating on a subproc where the fd isn't found, even though it's mapped"
      end
      fd.close rescue true
    end
  end
end

#process_timeoutsObject



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/mcmd.rb', line 144

def process_timeouts
  now = Time.now.to_i
  @subproc_by_pid.values.each do |p|
    if ((now - p.time_start) > self.perchild_timeout) and self.perchild_timeout > 0
      # expire this child process
      
      self.yield_proc_timeout.call(p) unless self.yield_proc_timeout.nil?
      self.kill_process(p)
    end
  end
end

#process_write_fds(write_fds) ⇒ Object

process_read_fds()



120
121
122
123
124
125
126
127
# File 'lib/mcmd.rb', line 120

def process_write_fds(write_fds)
  write_fds.each do |fd|
    raise "working on an unknown fd #{fd}" unless @subproc_by_fd.has_key?(fd)
    subproc = @subproc_by_fd[fd]
    buf = ""
    # add writing here, todo. not core feature
  end
end

#return_rundataObject



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/mcmd.rb', line 200

def return_rundata
  data = []
  @processed_commands.each do |c|
    #FIXME pass through the process object
    data << {
      :pid => c.pid,
      :write_buf_position => c.write_buf_position,
      :stdout_buf => c.stdout_buf,
      :stderr_buf => c.stderr_buf,
      :command => c.command,
      :time_start => c.time_start,
      :time_end => c.time_end,
      :retval => c.retval,
    }
  end
  return data
end

#runObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/mcmd.rb', line 166

def run
  @global_time_start = Time.now.to_i
  done = false
  while not done
    # start up as many as maxflight processes
    while @subproc_by_pid.length < self.maxflight and not @commands.empty?
      # take one from @commands and start it
      commands = @commands.shift
      self.add_subprocess(commands)
    end
    # service running processes
    self.service_subprocess_io
    # timeout overdue processes
    self.process_timeouts
    # service process cleanup
    self.wait
    puts "have #{@subproc_by_pid.length} left to go" if self.debug
    # if we have nothing in flight (active pid)
    # and nothing pending on the input list
    # then we're done
    if @subproc_by_pid.length.zero? and @commands.empty?
      done = true
    end
  end
  
  data = self.return_rundata
  # these are re-initialized after every run
  @subproc_by_pid = Hash.new
  @subproc_by_fd = Hash.new
  @processed_commands = []
  # end items which are re-initialized
  return data
end

#service_subprocess_ioObject

iterate and service fds in child procs, collect data and status



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/mcmd.rb', line 132

def service_subprocess_io
  write_fds = @subproc_by_pid.values.select {|x| not x.stdin_fd.nil? and not x.terminated}.map {|x| x.stdin_fd}
  read_fds = @subproc_by_pid.values.select {|x| not x.terminated}.map {|x| [x.stdout_fd, x.stderr_fd].select {|x| not x.nil? } }.flatten

  read_fds, write_fds, err_fds = IO.select_using_poll(read_fds, write_fds, nil, self.poll_period)

  self.process_read_fds(read_fds) unless read_fds.nil?
  self.process_write_fds(write_fds) unless write_fds.nil?
  self.process_err_fds(err_fds) unless err_fds.nil?
  # errors? 
end

#waitObject



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/mcmd.rb', line 218

def wait
  possible_children = true
  just_reaped = Array.new
  while possible_children
    begin
      pid = Process::waitpid(-1, Process::WNOHANG)
      if pid.nil?
        possible_children = false
      else
        # pid is now gone. remove from subproc_by_pid and
        # add to the processed commands list
        p = @subproc_by_pid[pid]
        p.time_end = Time.now.to_i
        p.retval = $?
        @subproc_by_pid.delete(pid)
        @processed_commands << p
        just_reaped << p
      end
    rescue Errno::ECHILD => ex
      # ECHILD. ignore.
      possible_children = false
    end
  end
  # We may have waited on a child before reading all its output. Collect those missing bits. No blocking.
  if not just_reaped.empty?
    read_fds = just_reaped.select {|x| not x.terminated}.map {|x| [x.stdout_fd, x.stderr_fd].select {|x| not x.nil? } }.flatten
    read_fds, write_fds, err_fds = IO.select_using_poll(read_fds, nil, nil, 0)
    self.process_read_fds(read_fds) unless read_fds.nil?
  end
  just_reaped.each do |p|
    self.yield_wait.call(p) unless self.yield_wait.nil?
  end
end