Class: MCMD::MparallelManager

Inherits:
Object
  • Object
show all
Defined in:
lib/nysol/mparallelmanager.rb

Direct Known Subclasses

MparallelManagerByFile

Instance Method Summary collapse

Constructor Details

#initialize(mp = 4, tim = -1)) ⇒ MparallelManager

Returns a new instance of MparallelManager.



82
83
84
85
86
87
88
89
# File 'lib/nysol/mparallelmanager.rb', line 82

def initialize(mp=4,tim=-1)
  @mp = mp           # パラレルサイズ
  @thInterval = tim # チェック間隔
  @runpid = {}       # pid => laneNo ## 動いてるPROCESS
  @slppid = []     # [ [pid ,laneNo child pid] ... ## 休止中PROCESS
  @mtx =  Mutex.new if @thInterval > 0
  @LaneQue = Array.new(mp){|i| i } 
end

Instance Method Details

#addPid(pid, lane) ⇒ Object

実行PID=>lane登録



160
161
162
163
164
165
166
# File 'lib/nysol/mparallelmanager.rb', line 160

def addPid(pid,lane)
  if @mtx then
    @mtx.synchronize { @runpid[pid]=lane }
  else
    @runpid[pid]=lane
  end
end

#emptyLQ?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/nysol/mparallelmanager.rb', line 91

def emptyLQ?
  @LaneQue.empty?
end

#getLane(wait = true) ⇒ Object

空き実行レーン取得



154
155
156
157
# File 'lib/nysol/mparallelmanager.rb', line 154

def getLane(wait=true)
  waitLane if wait and @LaneQue.empty? 
  return @LaneQue.shift
end

#runStateChekerObject

メモリ,CPUチェッカー



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/nysol/mparallelmanager.rb', line 169

def runStateCheker 
  return unless @mtx 
  Thread.new {
  begin
  loop{ 
    if MCMD::SysInfo.LimitOver_Mem_Cpu? then
      @mtx.synchronize {
      if @runpid.size > 1 then
        pid = @runpid.keys[0]
        plist = MCMD::SysInfo.cPIDs(pid)
        stopL = []
        plist.reverse_each{|px|
          begin
            Process.kill(:STOP, px) 
            stopL << px
          rescue => msg #STOP できなくてもスルー
            puts "already finish #{px}"
            next
          end
        }
        unless stopL.empty? then
          pno = @runpid.delete(pid)
          @slppid << [pid,pno,stopL] 
        end
      else
        unless @slppid.empty? then
          pid,pno,plist = @slppid.shift
          plist.each{|px|
            begin
              Process.kill(:CONT, px) 
            rescue => msg
              puts "already finish #{px}"
            end
          }
          @runpid[pid]=pno
        end
      end
      }
    else
      @mtx.synchronize {
      unless @slppid.empty? then
        pid,pno,plist = @slppid.shift
        plist.each{|px|
         begin
            Process.kill(:CONT, px) 
          rescue => msg
            puts "already finish #{px}"
          end
        }
        @runpid[pid]=pno
      end
      }
    end
    sleep @thInterval
  }
  rescue => msg 
    p msg
    exit
  end
  }    
end

#waitallObject

全プロセス終了確認



145
146
147
148
149
150
151
# File 'lib/nysol/mparallelmanager.rb', line 145

def waitall
  rtn = []
  while !@runpid.empty? or !@slppid.empty? do
    rtn.concat(waitLane) 
  end
  return rtn
end

#waitLaneObject

プロセス終了確認



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/nysol/mparallelmanager.rb', line 96

def waitLane
  finLane =[]
  loop{
    begin 
      rpid = nil
      sts  = nil 
      loop{
        @runpid.each{|k,v|
          rpid ,sts = Process.waitpid2(k,Process::WNOHANG)
          break unless rpid == nil
        }
        break unless rpid == nil
      }
    rescue 
      if @mtx then 
        @mtx.synchronize {
          @runpid.each{|k,v| 
            finLane.push(v)
            @LaneQue.push(v) 
          }
          @runpid.clear
        }
      else
        @runpid.each{|k,v| 
          finLane.push(v)
          @LaneQue.push(v) 
        }
        @runpid.clear
      end
      break
    end
    new_pno = nil
    if @mtx then 
      @mtx.synchronize {
        new_pno = @runpid.delete(rpid)
      }
    else
        new_pno = @runpid.delete(rpid)
    end
    if new_pno != nil then
      finLane.push(new_pno)
      @LaneQue.push(new_pno)
      break
    end
  }
  return finLane
end