Class: Comana::ComputationManager
- Inherits:
-
Object
- Object
- Comana::ComputationManager
- Defined in:
- lib/comana/computationmanager.rb
Overview
This class provides a framework of scientific computation. Users have to redefine some methods in subclasses for various computation.
Defined Under Namespace
Classes: AlreadyStartedError, AlreadySubmittedError, ExecuteError, InitializeError, NotImplementedError
Constant Summary collapse
- QSUB_SCRIPT_NAME =
'qsub.sh'
- QSUB_LOG_NAME =
'qsub.log'
Instance Attribute Summary collapse
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
Class Method Summary collapse
-
.effective_queue(queues, jobs, hosts, benchmarks) ⇒ Object
jobs < hosts のキューがあれば(空きホストがあれば)、その中で bench が最小のもの なければ、self.guess_end_time の値が最小のもの。.
- .execute(args) ⇒ Object
-
.guess_end_time(num_jobs, num_hosts, benchmark) ⇒ Object
新しく1個ジョブを追加した場合の終了見込み時間 ジョブの終了時刻がランダムであり、ジョブの実行時間は等しいと仮定して算出。 空きホストがあれば benchmark 通りの見込み。 空きホストがなければ、ホストがあくまでの見込み時間を加算。.
- .qsub(args, options) ⇒ Object
Instance Method Summary collapse
-
#execute ⇒ Object
(also: #start)
Execute calculation.
-
#initialize(dir) ⇒ ComputationManager
constructor
You can redefine in subclass to modify from default values.
-
#latest_modified_time ⇒ Object
Return latest modified time of files in calc dir recursively.
- #queue_submit(q_name:, pe_name:, ppn:, ld_library_path:, command:, submit: true) ⇒ Object
-
#state ⇒ Object
Return a symbol which indicate state of calculation.
Constructor Details
#initialize(dir) ⇒ ComputationManager
You can redefine in subclass to modify from default values.
20 21 22 23 24 |
# File 'lib/comana/computationmanager.rb', line 20 def initialize(dir) @dir = dir # redefine in subclass. @lockdir = "lock_comana" @alive_time = 3600 end |
Instance Attribute Details
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
17 18 19 |
# File 'lib/comana/computationmanager.rb', line 17 def dir @dir end |
Class Method Details
.effective_queue(queues, jobs, hosts, benchmarks) ⇒ Object
jobs < hosts のキューがあれば(空きホストがあれば)、その中で bench が最小のもの なければ、self.guess_end_time の値が最小のもの。
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/comana/computationmanager.rb', line 115 def self.effective_queue(queues, jobs, hosts, benchmarks) candidates = queues.select do |q| jobs[q] < hosts[q] end if candidates.empty? result = queues.min_by{|q| self.guess_end_time(jobs[q], hosts[q], benchmarks[q]) } else result = candidates.min_by {|q| benchmarks[q]} end result end |
.execute(args) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/comana/computationmanager.rb', line 26 def self.execute(args) targets = args targets = [ENV['PWD']] if targets.empty? targets.each do |dir| print "#{dir}..." begin calc_dir = self.new(dir) rescue => exc puts "Not #{self}: #{dir}" next end begin calc_dir.start rescue self::AlreadyStartedError puts "Already started: #{dir}" next end end end |
.guess_end_time(num_jobs, num_hosts, benchmark) ⇒ Object
新しく1個ジョブを追加した場合の終了見込み時間 ジョブの終了時刻がランダムであり、ジョブの実行時間は等しいと仮定して算出。 空きホストがあれば benchmark 通りの見込み。 空きホストがなければ、ホストがあくまでの見込み時間を加算。
131 132 133 134 135 136 137 138 139 |
# File 'lib/comana/computationmanager.rb', line 131 def self.guess_end_time(num_jobs , num_hosts, benchmark) if num_jobs < num_hosts unit = 1.0 #unit else unit = (num_jobs.to_f + 1.0) / (num_hosts.to_f) end result = unit * benchmark result end |
.qsub(args, options) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/comana/computationmanager.rb', line 48 def self.qsub(args, ) if [:auto] || [:load_group] # OK elsif !([:q_name] && [:pe_name] && [:ppn] && [:command]) puts "Lack of required options." puts "Need (--auto) or (--load-group) or (--q-name && --pe-name && --ppn && --command )" puts "E.g., OK: #{File.basename($0)} --auto" puts " OK: #{File.basename($0)} --load-group=cluster_name" puts " OK: #{File.basename($0)} --q-name=a --pe-name=b --ppn=1 --command=c" puts " NG: #{File.basename($0)} --q-name=a --pe-name=b --ppn=1" puts "Exit." exit end tgts = args tgts = [ENV['PWD']] if tgts.empty? tgts.each do |dir| cs = Comana::ClusterSetting.load_file if [:load_group] q_name = [:load_group] elsif [:auto] queues = Comana::GridEngine.queues jobs = {} hosts = {} benchmarks = {} queues.each do |q| jobs[q] = Comana::GridEngine.queue_jobs(q).size hosts[q] = Comana::GridEngine.queue_alive_nums[q] benchmarks[q] = Comana::ClusterSetting.load_file.settings_queue(q)['benchmark'] end q_name = self.effective_queue(queues, jobs, hosts, benchmarks) end if [:load_group] || [:auto] gs = cs.settings_queue(q_name) q_name ||= gs['queue'] pe_name ||= gs['pe'] ppn ||= gs['ppn'] ld_library_path ||= gs['ld_library_path'] end q_name = [:q_name] if [:q_name] pe_name = [:pe_name] if [:pe_name] ppn = [:ppn] if [:ppn] ld_library_path = [:ld_library_path] if [:ld_library_path] command = [:command] || "#{`which #{__FILE__}`.chomp} execute" begin calc_dir = self.new(dir) calc_dir.queue_submit( q_name: q_name, pe_name: pe_name, ppn: ppn, ld_library_path: ld_library_path, command: command ) rescue self::InitializeError puts "Not #{self} : #{dir}" rescue Comana::ComputationManager::AlreadySubmittedError puts "Already started: #{dir}" end end end |
Instance Method Details
#execute ⇒ Object Also known as: start
Execute calculation. If log of ComputationManager exist, raise ComputationManager::AlreadyStartedError, because the calculation has been done by other process already. This method is aliased to ‘start’.
157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/comana/computationmanager.rb', line 157 def execute begin Dir.mkdir "#{@dir}/#{@lockdir}" rescue Errno::EEXIST raise AlreadyStartedError, "Exist #{@dir}/#{@lockdir}" end while true calculate break if finished? prepare_next end end |
#latest_modified_time ⇒ Object
Return latest modified time of files in calc dir recursively. require “find”
174 175 176 177 178 179 |
# File 'lib/comana/computationmanager.rb', line 174 def latest_modified_time tmp = Dir.glob("#{@dir}/**/*").max_by do |file| File.mtime(file) end File.mtime(tmp) end |
#queue_submit(q_name:, pe_name:, ppn:, ld_library_path:, command:, submit: true) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/comana/computationmanager.rb', line 181 def queue_submit(q_name:, pe_name:, ppn:, ld_library_path: , command:, submit: true) qsub_path = "#{@dir}/#{QSUB_SCRIPT_NAME}" if FileTest.exist? qsub_path raise AlreadySubmittedError, "Already exist #{qsub_path}." end File.open(qsub_path, "w") do |io| Comana::GridEngine.write_qsub_script( q_name: q_name, pe_name: pe_name, ppn: ppn, ld_library_path: ld_library_path, command: command, io: io ) end cur_dir = Dir.pwd Dir.chdir @dir print "Submitting #{qsub_path}..." system("qsub #{QSUB_SCRIPT_NAME} > #{QSUB_LOG_NAME}") if submit puts "Done." Dir.chdir cur_dir end |
#state ⇒ Object
Return a symbol which indicate state of calculation.
:yet not started
:started started, but not ended, including short time from last output
:terminated started, but long time no output
:finished started, normal ended and not need next calculation
146 147 148 149 150 151 |
# File 'lib/comana/computationmanager.rb', line 146 def state return :finished if finished? return :yet unless started? return :terminated if (Time.now - latest_modified_time > @alive_time) return :started end |