Class: OodCore::Job::Adapters::Torque
- Inherits:
-
OodCore::Job::Adapter
- Object
- OodCore::Job::Adapter
- OodCore::Job::Adapters::Torque
- Defined in:
- lib/ood_core/job/adapters/torque.rb
Overview
An adapter object that describes the communication with a Torque resource manager for job management.
Constant Summary collapse
- STATE_MAP =
Mapping of state characters for PBS
{ 'Q' => :queued, 'H' => :queued_held, 'T' => :queued_held, # transiting, most like a held job 'R' => :running, 'S' => :suspended, 'E' => :running, # exiting, but still running 'C' => :completed }
Instance Method Summary collapse
-
#delete(id) ⇒ void
Delete the submitted job.
-
#hold(id) ⇒ void
Put the submitted job on hold.
-
#info(id) ⇒ Info
Retrieve job info from the resource manager.
-
#info_all ⇒ Array<Info>
Retrieve info for all jobs from the resource manager.
-
#info_where_owner(owner) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager.
-
#initialize(opts = {}) ⇒ Torque
constructor
private
A new instance of Torque.
-
#release(id) ⇒ void
Release the job that is on hold.
-
#status(id) ⇒ Status
Retrieve job status from resource manager.
-
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance.
Constructor Details
#initialize(opts = {}) ⇒ Torque
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Torque.
48 49 50 51 52 |
# File 'lib/ood_core/job/adapters/torque.rb', line 48 def initialize(opts = {}) o = opts.to_h.symbolize_keys @pbs = o.fetch(:pbs) { raise ArgumentError, "No pbs object specified. Missing argument: pbs" } end |
Instance Method Details
#delete(id) ⇒ void
This method returns an undefined value.
Delete the submitted job
222 223 224 225 226 227 228 229 230 |
# File 'lib/ood_core/job/adapters/torque.rb', line 222 def delete(id) @pbs.delete_job(id.to_s) rescue PBS::UnkjobidError, PBS::BadstateError # assume successful job deletion if can't find job id # assume successful job deletion if job is exiting or completed nil rescue PBS::Error => e raise JobAdapterError, e. end |
#hold(id) ⇒ void
This method returns an undefined value.
Put the submitted job on hold
194 195 196 197 198 199 200 201 |
# File 'lib/ood_core/job/adapters/torque.rb', line 194 def hold(id) @pbs.hold_job(id.to_s) rescue PBS::UnkjobidError # assume successful job hold if can't find job id nil rescue PBS::Error => e raise JobAdapterError, e. end |
#info(id) ⇒ Info
Retrieve job info from the resource manager
160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/ood_core/job/adapters/torque.rb', line 160 def info(id) id = id.to_s parse_job_info(*@pbs.get_job(id).flatten) rescue PBS::UnkjobidError # set completed status if can't find job id Info.new( id: id, status: :completed ) rescue PBS::Error => e raise JobAdapterError, e. end |
#info_all ⇒ Array<Info>
Retrieve info for all jobs from the resource manager
129 130 131 132 133 134 135 |
# File 'lib/ood_core/job/adapters/torque.rb', line 129 def info_all @pbs.get_jobs.map do |k, v| parse_job_info(k, v) end rescue PBS::Error => e raise JobAdapterError, e. end |
#info_where_owner(owner) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager
142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/ood_core/job/adapters/torque.rb', line 142 def info_where_owner(owner) owner = Array.wrap(owner).map(&:to_s) @pbs.select_jobs( attribs: [ { name: "User_List", value: owner.join(","), op: :eq } ] ).map do |k, v| parse_job_info(k, v) end rescue PBS::Error => e raise JobAdapterError, e. end |
#release(id) ⇒ void
This method returns an undefined value.
Release the job that is on hold
208 209 210 211 212 213 214 215 |
# File 'lib/ood_core/job/adapters/torque.rb', line 208 def release(id) @pbs.release_job(id.to_s) rescue PBS::UnkjobidError # assume successful job release if can't find job id nil rescue PBS::Error => e raise JobAdapterError, e. end |
#status(id) ⇒ Status
Retrieve job status from resource manager
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/ood_core/job/adapters/torque.rb', line 178 def status(id) id = id.to_s char = @pbs.get_job(id, filters: [:job_state])[id][:job_state] Status.new(state: STATE_MAP.fetch(char, :undetermined)) rescue PBS::UnkjobidError # set completed status if can't find job id Status.new(state: :completed) rescue PBS::Error => e raise JobAdapterError, e. end |
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/ood_core/job/adapters/torque.rb', line 68 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) # Set headers headers = {} headers.merge!(job_arguments: script.args.join(' ')) unless script.args.nil? headers.merge!(Hold_Types: :u) if script.submit_as_hold headers.merge!(Rerunable: script.rerunnable ? 'y' : 'n') unless script.rerunnable.nil? headers.merge!(init_work_dir: script.workdir) unless script.workdir.nil? headers.merge!(Mail_Users: script.email.join(',')) unless script.email.nil? mail_points = '' mail_points += 'b' if script.email_on_started mail_points += 'e' if script.email_on_terminated headers.merge!(Mail_Points: mail_points) unless mail_points.empty? headers.merge!(Job_Name: script.job_name) unless script.job_name.nil? # ignore input_path (not defined in Torque) headers.merge!(Output_Path: script.output_path) unless script.output_path.nil? headers.merge!(Error_Path: script.error_path) unless script.error_path.nil? # If error_path is not specified we join stdout & stderr (as this # mimics what the other resource managers do) headers.merge!(Join_Path: 'oe') if script.error_path.nil? headers.merge!(reservation_id: script.reservation_id) unless script.reservation_id.nil? headers.merge!(Priority: script.priority) unless script.priority.nil? headers.merge!(Execution_Time: script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")) unless script.start_time.nil? headers.merge!(Account_Name: script.accounting_id) unless script.accounting_id.nil? # Set dependencies depend = [] depend << "after:#{after.join(':')}" unless after.empty? depend << "afterok:#{afterok.join(':')}" unless afterok.empty? depend << "afternotok:#{afternotok.join(':')}" unless afternotok.empty? depend << "afterany:#{afterany.join(':')}" unless afterany.empty? headers.merge!(depend: depend.join(',')) unless depend.empty? # Set resources resources = {} resources.merge!(walltime: seconds_to_duration(script.wall_time)) unless script.wall_time.nil? # Set environment variables envvars = script.job_environment || {} # Set native options if script.native headers.merge! script.native.fetch(:headers, {}) resources.merge! script.native.fetch(:resources, {}) envvars.merge! script.native.fetch(:envvars, {}) end # Submit job @pbs.submit_string(script.content, queue: script.queue_name, headers: headers, resources: resources, envvars: envvars) rescue PBS::Error => e raise JobAdapterError, e. end |