Class: OodCore::Job::Adapters::PBSPro
- Inherits:
-
OodCore::Job::Adapter
- Object
- OodCore::Job::Adapter
- OodCore::Job::Adapters::PBSPro
- Defined in:
- lib/ood_core/job/adapters/pbspro.rb
Overview
An adapter object that describes the communication with a PBS Pro resource manager for job management.
Defined Under Namespace
Classes: Batch
Constant Summary collapse
- STATE_MAP =
Mapping of state codes for PBSPro
{ 'Q' => :queued, 'W' => :queued, # job is waiting for its submitter-assigned start time to be reached 'H' => :queued_held, 'T' => :queued_held, # job is being moved to a new location 'M' => :completed, # job was moved to another server 'R' => :running, 'S' => :suspended, 'U' => :suspended, # cycle-harvesting job is suspended due to keyboard activity 'E' => :running, # job is exiting after having run 'F' => :completed, # job is finished 'X' => :completed # subjob has completed execution or has been deleted # ignore B as it signifies a job array }
Instance Attribute Summary collapse
-
#qstat_factor ⇒ Float
readonly
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs.
Instance Method Summary collapse
-
#delete(id) ⇒ void
Delete the submitted job.
-
#hold(id) ⇒ void
Put the submitted job on hold.
-
#info(id) ⇒ Info
Retrieve job info from the resource manager.
-
#info_all ⇒ Array<Info>
Retrieve info for all jobs from the resource manager.
-
#info_where_owner(owner) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager.
-
#initialize(opts = {}) ⇒ PBSPro
constructor
private
A new instance of PBSPro.
-
#release(id) ⇒ void
Release the job that is on hold.
-
#status(id) ⇒ Status
Retrieve job status from resource manager.
-
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance.
Constructor Details
#initialize(opts = {}) ⇒ PBSPro
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of PBSPro.
188 189 190 191 192 193 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 188 def initialize(opts = {}) o = opts.to_h.compact.symbolize_keys @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" } @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f end |
Instance Attribute Details
#qstat_factor ⇒ Float (readonly)
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs
181 182 183 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 181 def qstat_factor @qstat_factor end |
Instance Method Details
#delete(id) ⇒ void
This method returns an undefined value.
Delete the submitted job
358 359 360 361 362 363 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 358 def delete(id) @pbspro.delete_job(id.to_s) rescue Batch::Error => e # assume successful job deletion if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#hold(id) ⇒ void
This method returns an undefined value.
Put the submitted job on hold
334 335 336 337 338 339 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 334 def hold(id) @pbspro.hold_job(id.to_s) rescue Batch::Error => e # assume successful job hold if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#info(id) ⇒ Info
Retrieve job info from the resource manager
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 303 def info(id) id = id.to_s @pbspro.get_jobs(id: id).map do |v| parse_job_info(v) end.first || Info.new(id: id, status: :completed) rescue Batch::Error => e # set completed status if can't find job id if /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. Info.new( id: id, status: :completed ) else raise JobAdapterError, e. end end |
#info_all ⇒ Array<Info>
Retrieve info for all jobs from the resource manager
270 271 272 273 274 275 276 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 270 def info_all @pbspro.get_jobs.map do |v| parse_job_info(v) end rescue Batch::Error => e raise JobAdapterError, e. end |
#info_where_owner(owner) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager
283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 283 def info_where_owner(owner) owner = Array.wrap(owner).map(&:to_s) usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")]) all_jobs = @pbspro.select_jobs(args: ["-T"]) # `qstat` all jobs if user has too many jobs, otherwise `qstat` each # individual job (default factor is 10%) if usr_jobs.size > (qstat_factor * all_jobs.size) super else usr_jobs.map { |id| info(id) } end end |
#release(id) ⇒ void
This method returns an undefined value.
Release the job that is on hold
346 347 348 349 350 351 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 346 def release(id) @pbspro.release_job(id.to_s) rescue Batch::Error => e # assume successful job release if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#status(id) ⇒ Status
Retrieve job status from resource manager
325 326 327 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 325 def status(id) info(id.to_s).status end |
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 210 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) # Set qsub options args = [] # ignore args, can't use these if submitting from STDIN args += ["-h"] if script.submit_as_hold args += ["-r", script.rerunnable ? "y" : "n"] unless script.rerunnable.nil? args += ["-M", script.email.join(",")] unless script.email.nil? if script.email_on_started && script.email_on_terminated args += ["-m", "be"] elsif script.email_on_started args += ["-m", "b"] elsif script.email_on_terminated args += ["-m", "e"] end args += ["-N", script.job_name] unless script.job_name.nil? # ignore input_path (not defined in PBS Pro) args += ["-o", script.output_path] unless script.output_path.nil? args += ["-e", script.error_path] unless script.error_path.nil? # Reservations are actually just queues in PBS Pro args += ["-q", script.reservation_id] if !script.reservation_id.nil? && script.queue_name.nil? args += ["-q", script.queue_name] unless script.queue_name.nil? args += ["-p", script.priority] unless script.priority.nil? args += ["-a", script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")] unless script.start_time.nil? args += ["-A", script.accounting_id] unless script.accounting_id.nil? args += ["-l", "walltime=#{seconds_to_duration(script.wall_time)}"] unless script.wall_time.nil? # Set dependencies depend = [] depend << "after:#{after.join(":")}" unless after.empty? depend << "afterok:#{afterok.join(":")}" unless afterok.empty? depend << "afternotok:#{afternotok.join(":")}" unless afternotok.empty? depend << "afterany:#{afterany.join(":")}" unless afterany.empty? args += ["-W", "depend=#{depend.join(",")}"] unless depend.empty? # Set environment variables envvars = script.job_environment.to_h args += ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty? # If error_path is not specified we join stdout & stderr (as this # mimics what the other resource managers do) args += ["-j", "oe"] if script.error_path.nil? # Set native options args += script.native if script.native # Submit job @pbspro.submit_string(script.content, args: args, chdir: script.workdir) rescue Batch::Error => e raise JobAdapterError, e. end |