Class: OSC::Machete::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/osc/machete/job.rb

Overview

Core object for working with batch jobs, including:

  • submitting jobs

  • checking job status

  • setting dependencies between jobs via a directed acyclic graph

Create a new Job from a script:

job = Job.new(script: '/nfs/17/efranz/jobs/1/script.sh')
job. #=> false
job.path #=> '/nfs/17/efranz/jobs/1'
job.script_name #=> 'script.sh'
job.status #=> nil
job.pbsid #=> nil

# PBS_O_WORKDIR will be set to the directory containing the script
job.submit

job. #=> true
job.status #=> "Q"
job.pbsid #=> "3422735.oak-batch"

# if you know the pbs id you can instantiate a
# Job object to ask for the status of it
job2 = Job.new(pbsid: "3422735.oak-batch")
job2.status #=> "Q"

# because the object was created with only the pbsid passed in,
# path and script_name and dependency information is not available
job2.path #=> nil
job2.script_name #=> nil

# but an unknown pbsid results in status nil
job3 = Job.new(pbsid: "12345.oak-batch")
job3.status #=> nil

Create two Job instances and form a dependency between them:

job1 = Job.new(script: '/nfs/17/efranz/jobs/1/script.sh')
job2 = Job.new(script: '/nfs/17/efranz/jobs/1/post.sh')

job2.afterany(job1) # job2 runs after job1 completes with any exit status

job1.submit
job2.submit

job1.status #=> "Q"
job2.status #=> "H"

Defined Under Namespace

Classes: ScriptMissingError

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Job

Create Job instance to represent an unsubmitted batch job from the specified script, or an existing, already submitted batch job from the specified pbsid

Takes params in options hash as single argument:

Job.new(script: '/path/to/job/dir/go.sh')

or

opts = Hash.new(script: '/path/to/job/dir/go.sh')
Job.new(opts)

Job class makes assumption that a job’s PBS_O_WORKDIR will be in the directory containing the shell script that is run.

Parameters:

  • args (Hash)

    the arguments to create the job

Options Hash (args):

  • :script (String)

    full path to script (optional)

  • :pbsid (String, nil)

    pbsid of a job already submitted (optional)

  • :torque_helper (TorqueHelper, nil)

    override default torque helper (optional) NOTE: used for testing purposes we could use it also if we had different torque_helper classes for different systems



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/osc/machete/job.rb', line 88

def initialize(args)
  @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil?
  # @script_path = @script_path.expand_path would change this to absolute path

  @pbsid =  args[:pbsid]
  @host =   args[:host]
  @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default
  @account_string = args[:account_string] || self.class.

  @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]}
end

Class Attribute Details

.default_account_stringObject

set this to change the billable account that is used by default



63
64
65
# File 'lib/osc/machete/job.rb', line 63

def 
  
end

Instance Attribute Details

#account_stringObject (readonly)

Returns the value of attribute account_string.



59
60
61
# File 'lib/osc/machete/job.rb', line 59

def 
  @account_string
end

#hostObject (readonly)

Returns the value of attribute host.



59
60
61
# File 'lib/osc/machete/job.rb', line 59

def host
  @host
end

#pbsidString? (readonly)

Returns the PBS job id, or nil if not set.

Returns:

  • (String, nil)

    the PBS job id, or nil if not set



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/osc/machete/job.rb', line 58

class OSC::Machete::Job
  attr_reader :pbsid, :script_path, :account_string, :host

  class << self
    # set this to change the billable account that is used by default
    attr_accessor :default_account_string
  end

  # Create Job instance to represent an unsubmitted batch job from the specified
  # script, or an existing, already submitted batch job from the specified pbsid
  #
  # Takes params in options hash as single argument:
  #
  #     Job.new(script: '/path/to/job/dir/go.sh')
  #
  # or
  #
  #     opts = Hash.new(script: '/path/to/job/dir/go.sh')
  #     Job.new(opts)
  #
  # Job class makes assumption that a job's PBS_O_WORKDIR will be
  # in the directory containing the shell script that is run.
  #
  # @param [Hash] args the arguments to create the job
  # @option args [String] :script  full path to script (optional)
  # @option args [String, nil] :pbsid   pbsid of a job already submitted (optional)
  # @option args [TorqueHelper, nil]  :torque_helper  override default torque helper (optional)
  #                       NOTE: used for testing purposes
  #                       we could use it also if we had different
  #                       torque_helper classes for different systems
  def initialize(args)
    @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil?
    # @script_path = @script_path.expand_path would change this to absolute path

    @pbsid =  args[:pbsid]
    @host =   args[:host]
    @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default
    @account_string = args[:account_string] || self.class.

    @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]}
  end

  # @return [String, nil] script name or nil if instance wasn't initialized with a script
  def script_name
    Pathname.new(@script_path).basename.to_s if @script_path
  end

  # @return [String, nil] job directory or nil if instance wasn't initialized with a script
  def path
    Pathname.new(@script_path).dirname if @script_path
  end

  # Submit any dependent jobs that haven't been submitted
  # then submit this job, specifying dependencies as required by Torque.
  # Submitting includes cd-ing into the script's directory and qsub-ing from
  # that location, ensuring that environment variable PBS_O_WORKDIR is
  # set to the directory containing the script.
  #
  # @raise [ScriptMissingError] Raised when the path to the script does not exist or cannot be read.
  def submit
    return if 
    raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable?

    # submit any dependent jobs that have not yet been submitted
    submit_dependencies

    # cd into directory, submit job from there
    # so that PBS_O_WORKDIR is set to location
    # where job is run
    #
    #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is
    # another env var besides PBS_O_WORKDIR that is affected by the path of the
    # current directory when the job is submitted
    #
    #TODO: what if you want to submit via piping to qsub i.e. without creating a file?
    Dir.chdir(path.to_s) do
      @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: 
    end
  end

  # Check whether the job jas been submitted.
  #
  # @return [Boolean] true if @pbsid is set
  def 
    ! @pbsid.nil?
  end

  # Perform a qstat and return a char representing the status of the job.
  #
  # @return [Status] value object representing status of a job
  def status
    if @pbsid.nil?
      OSC::Machete::Status.
    else
      @torque.qstat @pbsid, host: @host
    end
  end

  # Ensure Job starts only after the specified Job(s) complete
  #
  # @param [Job, Array<Job>] jobs Job(s) that this Job should depend on (wait for)
  # @return [self] self so you can chain method calls
  def afterany(jobs)
    add_dependencies(:afterany, jobs)
  end

  # Ensure Job starts only after the specified Job(s) complete with successful
  # return value.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def afterok(jobs)
    add_dependencies(:afterok, jobs)
  end

  # Ensure Job starts only after the specified Job(s) start.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def after(jobs)
    add_dependencies(:after, jobs)
  end

  # Ensure Job starts only after the specified Job(s) complete with error
  # return value.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def afternotok(jobs)
    add_dependencies(:afternotok, jobs)
  end

  # Kill the currently running batch job
  #
  # @param [Boolean] rmdir (false) if true, recursively remove the containing directory
  #                                of the job script if killing the job succeeded
  #
  # @return [nil]
  def delete(rmdir: false)
    # FIXME: rethink this interface... should qdel be idempotent?
    # After first call, no errors thrown after?

    if pbsid

      @torque.qdel(pbsid, host: @host)
      # FIXME: removing a directory is always a dangerous action.
      # I wonder if we can add more tests to make sure we don't delete
      # something if the script name is munged

      # recursively delete the directory after killing the job
      Pathname.new(path).rmtree if path && rmdir && File.exist?(path)
    end
  end

  # Error class thrown when script is not available.
  class ScriptMissingError < StandardError; end

  private

  def submit_dependencies
    #  assumes each dependency is a Job object
    @dependencies.values.flatten.each { |j| j.submit }
  end

  # build a dictionary of ids for each dependency type
  def dependency_ids
    ids = {}

    @dependencies.each do |type, jobs|
      ids[type] = jobs.map(&:pbsid).compact
    end

    ids.keep_if { |k,v| ! v.empty? }
  end

  def add_dependencies(type, jobs)
    @dependencies[type] = [] unless @dependencies.has_key?(type)
    @dependencies[type].concat(Array(jobs))

    self
  end
end

#script_pathString? (readonly)

Returns path of the job script, or nil if not set.

Returns:

  • (String, nil)

    path of the job script, or nil if not set



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/osc/machete/job.rb', line 58

class OSC::Machete::Job
  attr_reader :pbsid, :script_path, :account_string, :host

  class << self
    # set this to change the billable account that is used by default
    attr_accessor :default_account_string
  end

  # Create Job instance to represent an unsubmitted batch job from the specified
  # script, or an existing, already submitted batch job from the specified pbsid
  #
  # Takes params in options hash as single argument:
  #
  #     Job.new(script: '/path/to/job/dir/go.sh')
  #
  # or
  #
  #     opts = Hash.new(script: '/path/to/job/dir/go.sh')
  #     Job.new(opts)
  #
  # Job class makes assumption that a job's PBS_O_WORKDIR will be
  # in the directory containing the shell script that is run.
  #
  # @param [Hash] args the arguments to create the job
  # @option args [String] :script  full path to script (optional)
  # @option args [String, nil] :pbsid   pbsid of a job already submitted (optional)
  # @option args [TorqueHelper, nil]  :torque_helper  override default torque helper (optional)
  #                       NOTE: used for testing purposes
  #                       we could use it also if we had different
  #                       torque_helper classes for different systems
  def initialize(args)
    @script_path = Pathname.new(args[:script]).cleanpath unless args[:script].nil?
    # @script_path = @script_path.expand_path would change this to absolute path

    @pbsid =  args[:pbsid]
    @host =   args[:host]
    @torque = args[:torque_helper] || OSC::Machete::TorqueHelper.default
    @account_string = args[:account_string] || self.class.

    @dependencies = {} # {:afterany => [Job, Job], :afterok => [Job]}
  end

  # @return [String, nil] script name or nil if instance wasn't initialized with a script
  def script_name
    Pathname.new(@script_path).basename.to_s if @script_path
  end

  # @return [String, nil] job directory or nil if instance wasn't initialized with a script
  def path
    Pathname.new(@script_path).dirname if @script_path
  end

  # Submit any dependent jobs that haven't been submitted
  # then submit this job, specifying dependencies as required by Torque.
  # Submitting includes cd-ing into the script's directory and qsub-ing from
  # that location, ensuring that environment variable PBS_O_WORKDIR is
  # set to the directory containing the script.
  #
  # @raise [ScriptMissingError] Raised when the path to the script does not exist or cannot be read.
  def submit
    return if 
    raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable?

    # submit any dependent jobs that have not yet been submitted
    submit_dependencies

    # cd into directory, submit job from there
    # so that PBS_O_WORKDIR is set to location
    # where job is run
    #
    #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is
    # another env var besides PBS_O_WORKDIR that is affected by the path of the
    # current directory when the job is submitted
    #
    #TODO: what if you want to submit via piping to qsub i.e. without creating a file?
    Dir.chdir(path.to_s) do
      @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: 
    end
  end

  # Check whether the job jas been submitted.
  #
  # @return [Boolean] true if @pbsid is set
  def 
    ! @pbsid.nil?
  end

  # Perform a qstat and return a char representing the status of the job.
  #
  # @return [Status] value object representing status of a job
  def status
    if @pbsid.nil?
      OSC::Machete::Status.
    else
      @torque.qstat @pbsid, host: @host
    end
  end

  # Ensure Job starts only after the specified Job(s) complete
  #
  # @param [Job, Array<Job>] jobs Job(s) that this Job should depend on (wait for)
  # @return [self] self so you can chain method calls
  def afterany(jobs)
    add_dependencies(:afterany, jobs)
  end

  # Ensure Job starts only after the specified Job(s) complete with successful
  # return value.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def afterok(jobs)
    add_dependencies(:afterok, jobs)
  end

  # Ensure Job starts only after the specified Job(s) start.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def after(jobs)
    add_dependencies(:after, jobs)
  end

  # Ensure Job starts only after the specified Job(s) complete with error
  # return value.
  #
  # @param (see #afterany)
  # @return (see #afterany)
  def afternotok(jobs)
    add_dependencies(:afternotok, jobs)
  end

  # Kill the currently running batch job
  #
  # @param [Boolean] rmdir (false) if true, recursively remove the containing directory
  #                                of the job script if killing the job succeeded
  #
  # @return [nil]
  def delete(rmdir: false)
    # FIXME: rethink this interface... should qdel be idempotent?
    # After first call, no errors thrown after?

    if pbsid

      @torque.qdel(pbsid, host: @host)
      # FIXME: removing a directory is always a dangerous action.
      # I wonder if we can add more tests to make sure we don't delete
      # something if the script name is munged

      # recursively delete the directory after killing the job
      Pathname.new(path).rmtree if path && rmdir && File.exist?(path)
    end
  end

  # Error class thrown when script is not available.
  class ScriptMissingError < StandardError; end

  private

  def submit_dependencies
    #  assumes each dependency is a Job object
    @dependencies.values.flatten.each { |j| j.submit }
  end

  # build a dictionary of ids for each dependency type
  def dependency_ids
    ids = {}

    @dependencies.each do |type, jobs|
      ids[type] = jobs.map(&:pbsid).compact
    end

    ids.keep_if { |k,v| ! v.empty? }
  end

  def add_dependencies(type, jobs)
    @dependencies[type] = [] unless @dependencies.has_key?(type)
    @dependencies[type].concat(Array(jobs))

    self
  end
end

Instance Method Details

#after(jobs) ⇒ self

Ensure Job starts only after the specified Job(s) start.

Parameters:

  • jobs (Job, Array<Job>)

    Job(s) that this Job should depend on (wait for)

Returns:

  • (self)

    self so you can chain method calls



177
178
179
# File 'lib/osc/machete/job.rb', line 177

def after(jobs)
  add_dependencies(:after, jobs)
end

#afterany(jobs) ⇒ self

Ensure Job starts only after the specified Job(s) complete

Parameters:

  • jobs (Job, Array<Job>)

    Job(s) that this Job should depend on (wait for)

Returns:

  • (self)

    self so you can chain method calls



160
161
162
# File 'lib/osc/machete/job.rb', line 160

def afterany(jobs)
  add_dependencies(:afterany, jobs)
end

#afternotok(jobs) ⇒ self

Ensure Job starts only after the specified Job(s) complete with error return value.

Parameters:

  • jobs (Job, Array<Job>)

    Job(s) that this Job should depend on (wait for)

Returns:

  • (self)

    self so you can chain method calls



186
187
188
# File 'lib/osc/machete/job.rb', line 186

def afternotok(jobs)
  add_dependencies(:afternotok, jobs)
end

#afterok(jobs) ⇒ self

Ensure Job starts only after the specified Job(s) complete with successful return value.

Parameters:

  • jobs (Job, Array<Job>)

    Job(s) that this Job should depend on (wait for)

Returns:

  • (self)

    self so you can chain method calls



169
170
171
# File 'lib/osc/machete/job.rb', line 169

def afterok(jobs)
  add_dependencies(:afterok, jobs)
end

#delete(rmdir: false) ⇒ nil

Kill the currently running batch job

Parameters:

  • rmdir (Boolean) (defaults to: false)

    (false) if true, recursively remove the containing directory of the job script if killing the job succeeded

Returns:

  • (nil)


196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/osc/machete/job.rb', line 196

def delete(rmdir: false)
  # FIXME: rethink this interface... should qdel be idempotent?
  # After first call, no errors thrown after?

  if pbsid

    @torque.qdel(pbsid, host: @host)
    # FIXME: removing a directory is always a dangerous action.
    # I wonder if we can add more tests to make sure we don't delete
    # something if the script name is munged

    # recursively delete the directory after killing the job
    Pathname.new(path).rmtree if path && rmdir && File.exist?(path)
  end
end

#pathString?

Returns job directory or nil if instance wasn’t initialized with a script.

Returns:

  • (String, nil)

    job directory or nil if instance wasn’t initialized with a script



106
107
108
# File 'lib/osc/machete/job.rb', line 106

def path
  Pathname.new(@script_path).dirname if @script_path
end

#script_nameString?

Returns script name or nil if instance wasn’t initialized with a script.

Returns:

  • (String, nil)

    script name or nil if instance wasn’t initialized with a script



101
102
103
# File 'lib/osc/machete/job.rb', line 101

def script_name
  Pathname.new(@script_path).basename.to_s if @script_path
end

#statusStatus

Perform a qstat and return a char representing the status of the job.

Returns:

  • (Status)

    value object representing status of a job



148
149
150
151
152
153
154
# File 'lib/osc/machete/job.rb', line 148

def status
  if @pbsid.nil?
    OSC::Machete::Status.
  else
    @torque.qstat @pbsid, host: @host
  end
end

#submitObject

Submit any dependent jobs that haven’t been submitted then submit this job, specifying dependencies as required by Torque. Submitting includes cd-ing into the script’s directory and qsub-ing from that location, ensuring that environment variable PBS_O_WORKDIR is set to the directory containing the script.

Raises:

  • (ScriptMissingError)

    Raised when the path to the script does not exist or cannot be read.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/osc/machete/job.rb', line 117

def submit
  return if 
  raise ScriptMissingError, "#{script_path} does not exist or cannot be read" unless script_path.file? && script_path.readable?

  # submit any dependent jobs that have not yet been submitted
  submit_dependencies

  # cd into directory, submit job from there
  # so that PBS_O_WORKDIR is set to location
  # where job is run
  #
  #TODO: you can set PBS_O_WORKDIR via qsub args, is this necessary? there is
  # another env var besides PBS_O_WORKDIR that is affected by the path of the
  # current directory when the job is submitted
  #
  #TODO: what if you want to submit via piping to qsub i.e. without creating a file?
  Dir.chdir(path.to_s) do
    @pbsid = @torque.qsub script_name, depends_on: dependency_ids, host: @host, account_string: 
  end
end

#submitted?Boolean

Check whether the job jas been submitted.

Returns:

  • (Boolean)

    true if @pbsid is set



141
142
143
# File 'lib/osc/machete/job.rb', line 141

def 
  ! @pbsid.nil?
end