Class: OSC::Machete::Job
- Inherits:
-
Object
- Object
- OSC::Machete::Job
- Defined in:
- lib/osc/machete/job.rb
Overview
Core object for working with batch jobs, including:
-
submitting jobs
-
checking job status
-
setting dependencies between jobs via a directed acyclic graph
Create a new Job from a script:
job = Job.new(script: '/nfs/17/efranz/jobs/1/script.sh')
job.submitted? #=> false
job.path #=> '/nfs/17/efranz/jobs/1'
job.script_name #=> 'script.sh'
job.status #=> nil
job.pbsid #=> nil
# PBS_O_WORKDIR will be set to the directory containing the script
job.submit
job.submitted? #=> true
job.status #=> "Q"
job.pbsid #=> "3422735.oak-batch"
# if you know the pbs id you can instantiate a
# Job object to ask for the status of it
job2 = Job.new(pbsid: "3422735.oak-batch")
job2.status #=> "Q"
# because the object was created with only the pbsid passed in,
# path and script_name and dependency information is not available
job2.path #=> nil
job2.script_name #=> nil
# but an unknown pbsid results in status nil
job3 = Job.new(pbsid: "12345.oak-batch")
job3.status #=> nil
Create two Job instances and form a dependency between them:
job1 = Job.new(script: '/nfs/17/efranz/jobs/1/script.sh')
job2 = Job.new(script: '/nfs/17/efranz/jobs/1/post.sh')
job2.afterany(job1) # job2 runs after job1 completes with any exit status
job1.submit
job2.submit
job1.status #=> "Q"
job2.status #=> "H"
Defined Under Namespace
Classes: ScriptMissingError
Class Attribute Summary collapse
-
.default_account_string ⇒ Object
set this to change the billable account that is used by default.
Instance Attribute Summary collapse
-
#account_string ⇒ Object
readonly
Returns the value of attribute account_string.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#pbsid ⇒ String?
readonly
The PBS job id, or nil if not set.
-
#script_path ⇒ String?
readonly
Path of the job script, or nil if not set.
Instance Method Summary collapse
-
#after(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) start.
-
#afterany(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete.
-
#afternotok(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete with error return value.
-
#afterok(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete with successful return value.
-
#delete(rmdir: false) ⇒ nil
Kill the currently running batch job.
-
#initialize(args) ⇒ Job
constructor
Create Job instance to represent an unsubmitted batch job from the specified script, or an existing, already submitted batch job from the specified pbsid.
-
#path ⇒ String?
Job directory or nil if instance wasn’t initialized with a script.
-
#script_name ⇒ String?
Script name or nil if instance wasn’t initialized with a script.
-
#status ⇒ Status
Perform a qstat and return a char representing the status of the job.
-
#submit ⇒ Object
Submit any dependent jobs that haven’t been submitted then submit this job, specifying dependencies as required by Torque.
-
#submitted? ⇒ Boolean
Check whether the job jas been submitted.
Constructor Details
#initialize(args) ⇒ Job
Create Job instance to represent an unsubmitted batch job from the specified script, or an existing, already submitted batch job from the specified pbsid
Takes params in options hash as single argument:
Job.new(script: '/path/to/job/dir/go.sh')
or
opts = Hash.new(script: '/path/to/job/dir/go.sh')
Job.new(opts)
Job class makes assumption that a job’s PBS_O_WORKDIR will be in the directory containing the shell script that is run.
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/osc/machete/job.rb', line 88 def initialize(args) @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil? # @script_path = @script_path.expand_path would change this to absolute path @pbsid = args[:pbsid] @host = args[:host] @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default @account_string = args[:account_string] || self.class.default_account_string @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]} end |
Class Attribute Details
.default_account_string ⇒ Object
set this to change the billable account that is used by default
63 64 65 |
# File 'lib/osc/machete/job.rb', line 63 def default_account_string @default_account_string end |
Instance Attribute Details
#account_string ⇒ Object (readonly)
Returns the value of attribute account_string.
59 60 61 |
# File 'lib/osc/machete/job.rb', line 59 def account_string @account_string end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
59 60 61 |
# File 'lib/osc/machete/job.rb', line 59 def host @host end |
#pbsid ⇒ String? (readonly)
Returns the PBS job id, or nil if not set.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/osc/machete/job.rb', line 58 class OSC::Machete::Job attr_reader :pbsid, :script_path, :account_string, :host class << self # set this to change the billable account that is used by default attr_accessor :default_account_string end # Create Job instance to represent an unsubmitted batch job from the specified # script, or an existing, already submitted batch job from the specified pbsid # # Takes params in options hash as single argument: # # Job.new(script: '/path/to/job/dir/go.sh') # # or # # opts = Hash.new(script: '/path/to/job/dir/go.sh') # Job.new(opts) # # Job class makes assumption that a job's PBS_O_WORKDIR will be # in the directory containing the shell script that is run. # # @param [Hash] args the arguments to create the job # @option args [String] :script full path to script (optional) # @option args [String, nil] :pbsid pbsid of a job already submitted (optional) # @option args [TorqueHelper, nil] :torque_helper override default torque helper (optional) # NOTE: used for testing purposes # we could use it also if we had different # torque_helper classes for different systems def initialize(args) @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil? # @script_path = @script_path.expand_path would change this to absolute path @pbsid = args[:pbsid] @host = args[:host] @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default @account_string = args[:account_string] || self.class.default_account_string @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]} end # @return [String, nil] script name or nil if instance wasn't initialized with a script def script_name Pathname.new(@script_path).basename.to_s if @script_path end # @return [String, nil] job directory or nil if instance wasn't initialized with a script def path Pathname.new(@script_path).dirname if @script_path end # Submit any dependent jobs that haven't been submitted # then submit this job, specifying dependencies as required by Torque. # Submitting includes cd-ing into the script's directory and qsub-ing from # that location, ensuring that environment variable PBS_O_WORKDIR is # set to the directory containing the script. # # @raise [ScriptMissingError] Raised when the path to the script does not exist or cannot be read. def submit return if submitted? raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable? # submit any dependent jobs that have not yet been submitted submit_dependencies # cd into directory, submit job from there # so that PBS_O_WORKDIR is set to location # where job is run # #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is # another env var besides PBS_O_WORKDIR that is affected by the path of the # current directory when the job is submitted # #TODO: what if you want to submit via piping to qsub i.e. without creating a file? Dir.chdir(path.to_s) do @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: account_string end end # Check whether the job jas been submitted. # # @return [Boolean] true if @pbsid is set def submitted? ! @pbsid.nil? end # Perform a qstat and return a char representing the status of the job. # # @return [Status] value object representing status of a job def status if @pbsid.nil? OSC::Machete::Status.not_submitted else @torque.qstat @pbsid, host: @host end end # Ensure Job starts only after the specified Job(s) complete # # @param [Job, Array<Job>] jobs Job(s) that this Job should depend on (wait for) # @return [self] self so you can chain method calls def afterany(jobs) add_dependencies(:afterany, jobs) end # Ensure Job starts only after the specified Job(s) complete with successful # return value. # # @param (see #afterany) # @return (see #afterany) def afterok(jobs) add_dependencies(:afterok, jobs) end # Ensure Job starts only after the specified Job(s) start. # # @param (see #afterany) # @return (see #afterany) def after(jobs) add_dependencies(:after, jobs) end # Ensure Job starts only after the specified Job(s) complete with error # return value. # # @param (see #afterany) # @return (see #afterany) def afternotok(jobs) add_dependencies(:afternotok, jobs) end # Kill the currently running batch job # # @param [Boolean] rmdir (false) if true, recursively remove the containing directory # of the job script if killing the job succeeded # # @return [nil] def delete(rmdir: false) # FIXME: rethink this interface... should qdel be idempotent? # After first call, no errors thrown after? if pbsid @torque.qdel(pbsid, host: @host) # FIXME: removing a directory is always a dangerous action. # I wonder if we can add more tests to make sure we don't delete # something if the script name is munged # recursively delete the directory after killing the job Pathname.new(path).rmtree if path && rmdir && File.exist?(path) end end # Error class thrown when script is not available. class ScriptMissingError < StandardError; end private def submit_dependencies # assumes each dependency is a Job object @dependencies.values.flatten.each { |j| j.submit } end # build a dictionary of ids for each dependency type def dependency_ids ids = {} @dependencies.each do |type, jobs| ids[type] = jobs.map(&:pbsid).compact end ids.keep_if { |k,v| ! v.empty? } end def add_dependencies(type, jobs) @dependencies[type] = [] unless @dependencies.has_key?(type) @dependencies[type].concat(Array(jobs)) self end end |
#script_path ⇒ String? (readonly)
Returns path of the job script, or nil if not set.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/osc/machete/job.rb', line 58 class OSC::Machete::Job attr_reader :pbsid, :script_path, :account_string, :host class << self # set this to change the billable account that is used by default attr_accessor :default_account_string end # Create Job instance to represent an unsubmitted batch job from the specified # script, or an existing, already submitted batch job from the specified pbsid # # Takes params in options hash as single argument: # # Job.new(script: '/path/to/job/dir/go.sh') # # or # # opts = Hash.new(script: '/path/to/job/dir/go.sh') # Job.new(opts) # # Job class makes assumption that a job's PBS_O_WORKDIR will be # in the directory containing the shell script that is run. # # @param [Hash] args the arguments to create the job # @option args [String] :script full path to script (optional) # @option args [String, nil] :pbsid pbsid of a job already submitted (optional) # @option args [TorqueHelper, nil] :torque_helper override default torque helper (optional) # NOTE: used for testing purposes # we could use it also if we had different # torque_helper classes for different systems def initialize(args) @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil? # @script_path = @script_path.expand_path would change this to absolute path @pbsid = args[:pbsid] @host = args[:host] @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default @account_string = args[:account_string] || self.class.default_account_string @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]} end # @return [String, nil] script name or nil if instance wasn't initialized with a script def script_name Pathname.new(@script_path).basename.to_s if @script_path end # @return [String, nil] job directory or nil if instance wasn't initialized with a script def path Pathname.new(@script_path).dirname if @script_path end # Submit any dependent jobs that haven't been submitted # then submit this job, specifying dependencies as required by Torque. # Submitting includes cd-ing into the script's directory and qsub-ing from # that location, ensuring that environment variable PBS_O_WORKDIR is # set to the directory containing the script. # # @raise [ScriptMissingError] Raised when the path to the script does not exist or cannot be read. def submit return if submitted? raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable? # submit any dependent jobs that have not yet been submitted submit_dependencies # cd into directory, submit job from there # so that PBS_O_WORKDIR is set to location # where job is run # #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is # another env var besides PBS_O_WORKDIR that is affected by the path of the # current directory when the job is submitted # #TODO: what if you want to submit via piping to qsub i.e. without creating a file? Dir.chdir(path.to_s) do @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: account_string end end # Check whether the job jas been submitted. # # @return [Boolean] true if @pbsid is set def submitted? ! @pbsid.nil? end # Perform a qstat and return a char representing the status of the job. # # @return [Status] value object representing status of a job def status if @pbsid.nil? OSC::Machete::Status.not_submitted else @torque.qstat @pbsid, host: @host end end # Ensure Job starts only after the specified Job(s) complete # # @param [Job, Array<Job>] jobs Job(s) that this Job should depend on (wait for) # @return [self] self so you can chain method calls def afterany(jobs) add_dependencies(:afterany, jobs) end # Ensure Job starts only after the specified Job(s) complete with successful # return value. # # @param (see #afterany) # @return (see #afterany) def afterok(jobs) add_dependencies(:afterok, jobs) end # Ensure Job starts only after the specified Job(s) start. # # @param (see #afterany) # @return (see #afterany) def after(jobs) add_dependencies(:after, jobs) end # Ensure Job starts only after the specified Job(s) complete with error # return value. # # @param (see #afterany) # @return (see #afterany) def afternotok(jobs) add_dependencies(:afternotok, jobs) end # Kill the currently running batch job # # @param [Boolean] rmdir (false) if true, recursively remove the containing directory # of the job script if killing the job succeeded # # @return [nil] def delete(rmdir: false) # FIXME: rethink this interface... should qdel be idempotent? # After first call, no errors thrown after? if pbsid @torque.qdel(pbsid, host: @host) # FIXME: removing a directory is always a dangerous action. # I wonder if we can add more tests to make sure we don't delete # something if the script name is munged # recursively delete the directory after killing the job Pathname.new(path).rmtree if path && rmdir && File.exist?(path) end end # Error class thrown when script is not available. class ScriptMissingError < StandardError; end private def submit_dependencies # assumes each dependency is a Job object @dependencies.values.flatten.each { |j| j.submit } end # build a dictionary of ids for each dependency type def dependency_ids ids = {} @dependencies.each do |type, jobs| ids[type] = jobs.map(&:pbsid).compact end ids.keep_if { |k,v| ! v.empty? } end def add_dependencies(type, jobs) @dependencies[type] = [] unless @dependencies.has_key?(type) @dependencies[type].concat(Array(jobs)) self end end |
Instance Method Details
#after(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) start.
177 178 179 |
# File 'lib/osc/machete/job.rb', line 177 def after(jobs) add_dependencies(:after, jobs) end |
#afterany(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete
160 161 162 |
# File 'lib/osc/machete/job.rb', line 160 def afterany(jobs) add_dependencies(:afterany, jobs) end |
#afternotok(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete with error return value.
186 187 188 |
# File 'lib/osc/machete/job.rb', line 186 def afternotok(jobs) add_dependencies(:afternotok, jobs) end |
#afterok(jobs) ⇒ self
Ensure Job starts only after the specified Job(s) complete with successful return value.
169 170 171 |
# File 'lib/osc/machete/job.rb', line 169 def afterok(jobs) add_dependencies(:afterok, jobs) end |
#delete(rmdir: false) ⇒ nil
Kill the currently running batch job
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/osc/machete/job.rb', line 196 def delete(rmdir: false) # FIXME: rethink this interface... should qdel be idempotent? # After first call, no errors thrown after? if pbsid @torque.qdel(pbsid, host: @host) # FIXME: removing a directory is always a dangerous action. # I wonder if we can add more tests to make sure we don't delete # something if the script name is munged # recursively delete the directory after killing the job Pathname.new(path).rmtree if path && rmdir && File.exist?(path) end end |
#path ⇒ String?
Returns job directory or nil if instance wasn’t initialized with a script.
106 107 108 |
# File 'lib/osc/machete/job.rb', line 106 def path Pathname.new(@script_path).dirname if @script_path end |
#script_name ⇒ String?
Returns script name or nil if instance wasn’t initialized with a script.
101 102 103 |
# File 'lib/osc/machete/job.rb', line 101 def script_name Pathname.new(@script_path).basename.to_s if @script_path end |
#status ⇒ Status
Perform a qstat and return a char representing the status of the job.
148 149 150 151 152 153 154 |
# File 'lib/osc/machete/job.rb', line 148 def status if @pbsid.nil? OSC::Machete::Status.not_submitted else @torque.qstat @pbsid, host: @host end end |
#submit ⇒ Object
Submit any dependent jobs that haven’t been submitted then submit this job, specifying dependencies as required by Torque. Submitting includes cd-ing into the script’s directory and qsub-ing from that location, ensuring that environment variable PBS_O_WORKDIR is set to the directory containing the script.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/osc/machete/job.rb', line 117 def submit return if submitted? raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable? # submit any dependent jobs that have not yet been submitted submit_dependencies # cd into directory, submit job from there # so that PBS_O_WORKDIR is set to location # where job is run # #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is # another env var besides PBS_O_WORKDIR that is affected by the path of the # current directory when the job is submitted # #TODO: what if you want to submit via piping to qsub i.e. without creating a file? Dir.chdir(path.to_s) do @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: account_string end end |
#submitted? ⇒ Boolean
Check whether the job jas been submitted.
141 142 143 |
# File 'lib/osc/machete/job.rb', line 141 def submitted? ! @pbsid.nil? end |