Class: OodCore::Job::Adapters::Torque

Inherits:
OodCore::Job::Adapter show all
Defined in:
lib/ood_core/job/adapters/torque.rb

Overview

An adapter object that describes the communication with a Torque resource manager for job management.

Constant Summary collapse

STATE_MAP =

Mapping of state characters for PBS

{
  'Q' => :queued,
  'H' => :queued_held,
  'T' => :queued_held,    # transiting, most like a held job
  'R' => :running,
  'S' => :suspended,
  'E' => :running,        # exiting, but still running
  'C' => :completed
}

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Torque

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Torque

Parameters:

  • opts (#to_h) (defaults to: {})

    the options defining this adapter

Options Hash (opts):

  • :pbs (PBS::Batch)

    The PBS batch object

See Also:



47
48
49
50
51
# File 'lib/ood_core/job/adapters/torque.rb', line 47

def initialize(opts = {})
  o = opts.to_h.symbolize_keys

  @pbs = o.fetch(:pbs) { raise ArgumentError, "No pbs object specified. Missing argument: pbs" }
end

Instance Method Details

#delete(id) ⇒ void

This method returns an undefined value.

Delete the submitted job

Parameters:

  • id (#to_s)

    the id of the job

Raises:

See Also:



203
204
205
206
207
208
209
210
211
# File 'lib/ood_core/job/adapters/torque.rb', line 203

def delete(id)
  @pbs.delete_job(id.to_s)
rescue PBS::UnkjobidError, PBS::BadstateError
  # assume successful job deletion if can't find job id
  # assume successful job deletion if job is exiting or completed
  nil
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#hold(id) ⇒ void

This method returns an undefined value.

Put the submitted job on hold

Parameters:

  • id (#to_s)

    the id of the job

Raises:

See Also:



175
176
177
178
179
180
181
182
# File 'lib/ood_core/job/adapters/torque.rb', line 175

def hold(id)
  @pbs.hold_job(id.to_s)
rescue PBS::UnkjobidError
  # assume successful job hold if can't find job id
  nil
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#info(id) ⇒ Info

Retrieve job info from the resource manager

Parameters:

  • id (#to_s)

    the id of the job

Returns:

  • (Info)

    information describing submitted job

Raises:

See Also:



141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/ood_core/job/adapters/torque.rb', line 141

def info(id)
  id = id.to_s
  parse_job_info(*@pbs.get_job(id).flatten)
rescue PBS::UnkjobidError
  # set completed status if can't find job id
  Info.new(
    id: id,
    status: :completed
  )
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#info_allArray<Info>

Retrieve info for all jobs from the resource manager

Returns:

  • (Array<Info>)

    information describing submitted jobs

Raises:

See Also:



128
129
130
131
132
133
134
# File 'lib/ood_core/job/adapters/torque.rb', line 128

def info_all
  @pbs.get_jobs.map do |k, v|
    parse_job_info(k, v)
  end
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#release(id) ⇒ void

This method returns an undefined value.

Release the job that is on hold

Parameters:

  • id (#to_s)

    the id of the job

Raises:

See Also:



189
190
191
192
193
194
195
196
# File 'lib/ood_core/job/adapters/torque.rb', line 189

def release(id)
  @pbs.release_job(id.to_s)
rescue PBS::UnkjobidError
  # assume successful job release if can't find job id
  nil
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#status(id) ⇒ Status

Retrieve job status from resource manager

Parameters:

  • id (#to_s)

    the id of the job

Returns:

Raises:

See Also:



159
160
161
162
163
164
165
166
167
168
# File 'lib/ood_core/job/adapters/torque.rb', line 159

def status(id)
  id = id.to_s
  char = @pbs.get_job(id, filters: [:job_state])[id][:job_state]
  Status.new(state: STATE_MAP.fetch(char, :undetermined))
rescue PBS::UnkjobidError
  # set completed status if can't find job id
  Status.new(state: :completed)
rescue PBS::Error => e
  raise JobAdapterError, e.message
end

#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String

Submit a job with the attributes defined in the job template instance

Parameters:

  • script (Script)

    script object that describes the script and attributes for the submitted job

  • after (#to_s, Array<#to_s>)

    this job may be scheduled for execution at any point after dependent jobs have started execution

  • afterok (#to_s, Array<#to_s>)

    this job may be scheduled for execution only after dependent jobs have terminated with no errors

  • afternotok (#to_s, Array<#to_s>)

    this job may be scheduled for execution only after dependent jobs have terminated with errors

  • afterany (#to_s, Array<#to_s>)

    this job may be scheduled for execution after dependent jobs have terminated

Returns:

  • (String)

    the job id returned after successfully submitting a job

Raises:

See Also:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ood_core/job/adapters/torque.rb', line 67

def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
  after      = Array(after).map(&:to_s)
  afterok    = Array(afterok).map(&:to_s)
  afternotok = Array(afternotok).map(&:to_s)
  afterany   = Array(afterany).map(&:to_s)

  # Set headers
  headers = {}
  headers.merge!(job_arguments: script.args.join(' ')) unless script.args.nil?
  headers.merge!(Hold_Types: :u) if script.submit_as_hold
  headers.merge!(Rerunable: script.rerunnable ? 'y' : 'n') unless script.rerunnable.nil?
  headers.merge!(init_work_dir: script.workdir) unless script.workdir.nil?
  headers.merge!(Mail_Users: script.email.join(',')) unless script.email.nil?
  mail_points  = ''
  mail_points += 'b' if script.email_on_started
  mail_points += 'e' if script.email_on_terminated
  headers.merge!(Mail_Points: mail_points) unless mail_points.empty?
  headers.merge!(Job_Name: script.job_name) unless script.job_name.nil?
  # ignore input_path (not defined in Torque)
  headers.merge!(Output_Path: script.output_path) unless script.output_path.nil?
  headers.merge!(Error_Path: script.error_path) unless script.error_path.nil?
  # If error_path is not specified we join stdout & stderr (as this
  # mimics what the other resource managers do)
  headers.merge!(Join_Path: 'oe') if script.error_path.nil?
  headers.merge!(reservation_id: script.reservation_id) unless script.reservation_id.nil?
  headers.merge!(Priority: script.priority) unless script.priority.nil?
  headers.merge!(Execution_Time: script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")) unless script.start_time.nil?
  headers.merge!(Account_Name: script.accounting_id) unless script.accounting_id.nil?

  # Set dependencies
  depend = []
  depend << "after:#{after.join(':')}"           unless after.empty?
  depend << "afterok:#{afterok.join(':')}"       unless afterok.empty?
  depend << "afternotok:#{afternotok.join(':')}" unless afternotok.empty?
  depend << "afterany:#{afterany.join(':')}"     unless afterany.empty?
  headers.merge!(depend: depend.join(','))       unless depend.empty?

  # Set resources
  resources = {}
  resources.merge!(walltime: seconds_to_duration(script.wall_time)) unless script.wall_time.nil?

  # Set environment variables
  envvars = script.job_environment || {}

  # Set native options
  if script.native
    headers.merge!   script.native.fetch(:headers, {})
    resources.merge! script.native.fetch(:resources, {})
    envvars.merge!   script.native.fetch(:envvars, {})
  end

  # Submit job
  @pbs.submit_string(script.content, queue: script.queue_name, headers: headers, resources: resources, envvars: envvars)
rescue PBS::Error => e
  raise JobAdapterError, e.message
end