Class: OodCore::Job::Adapters::Sge::Batch Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ood_core/job/adapters/sge/batch.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Object used for simplified communication with a SGE batch server

Defined Under Namespace

Classes: Error

Constant Summary collapse

STATE_MAP =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Adapted from www.softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml

{
  'EhRqw'   => :undetermined, # all pending states with error
  'Ehqw'    => :undetermined, # all pending states with error
  'Eqw'     => :undetermined, # all pending states with error
  'RS'      => :suspended,    # all suspended with re-submit
  'RT'      => :suspended,    # all suspended with re-submit
  'Rr'      => :running,      # running, re-submit
  'Rs'      => :suspended,    # all suspended with re-submit
  'Rt'      => :running,      # transferring, re-submit
  'RtS'     => :suspended,    # all suspended with re-submit
  'RtT'     => :suspended,    # all suspended with re-submit
  'Rts'     => :suspended,    # all suspended with re-submit
  'S'       => :suspended,    # queue suspended
  'T'       => :suspended,    # queue suspended by alarm
  'dRS'     => :completed,    # all running and suspended states with deletion
  'dRT'     => :completed,    # all running and suspended states with deletion
  'dRr'     => :completed,    # all running and suspended states with deletion
  'dRs'     => :completed,    # all running and suspended states with deletion
  'dRt'     => :completed,    # all running and suspended states with deletion
  'dS'      => :completed,    # all running and suspended states with deletion
  'dT'      => :completed,    # all running and suspended states with deletion
  'dr'      => :completed,    # all running and suspended states with deletion
  'ds'      => :completed,    # all running and suspended states with deletion
  'dt'      => :completed,    # all running and suspended states with deletion
  'hRwq'    => :queued_held,  # pending, system hold, re-queue
  'hqw'     => :queued_held,  # pending, system hold
  'qw'      => :queued,       # pending
  'r'       => :running,      # running
  's'       => :suspended,    # suspended
  't'       => :running,      # transferring
  'tS'      => :suspended,    # queue suspended
  'tT'      => :suspended,    # queue suspended by alarm
  'ts'      => :suspended,    # obsuspended
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Batch

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Batch.

Parameters:

  • opts (#to_h)

    the options defining this adapter

See Also:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 34

def initialize(config)
  @cluster          = config.fetch(:cluster, nil)
  @bin              = Pathname.new(config.fetch(:bin, nil).to_s)
  @sge_root         = Pathname.new(config[:sge_root] || ENV['SGE_ROOT'] || "/var/lib/gridengine")
  @bin_overrides    = config.fetch(:bin_overrides, {})
  @submit_host      = config.fetch(:submit_host, "")
  @strict_host_checking = config.fetch(:strict_host_checking, true)

  # FIXME: hack as this affects env of the process!
  ENV['SGE_ROOT'] = @sge_root.to_s

  if config[:libdrmaa_path]
    load_drmaa(config[:libdrmaa_path])
    @can_use_drmaa    = true
  else
    @can_use_drmaa    = false
  end

  @helper = OodCore::Job::Adapters::Sge::Helper.new
end

Instance Attribute Details

#binObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def bin
  @bin
end

#bin_overridesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def bin_overrides
  @bin_overrides
end

#clusterObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def cluster
  @cluster
end

#confObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def conf
  @conf
end

#helperObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def helper
  @helper
end

#strict_host_checkingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def strict_host_checking
  @strict_host_checking
end

#submit_hostObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 18

def submit_host
  @submit_host
end

Instance Method Details

#call(cmd, *args, env: {}, stdin: "", chdir: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call a forked SGE command for a given batch server



169
170
171
172
173
174
175
176
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 169

def call(cmd, *args, env: {}, stdin: "", chdir: nil)
  cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
  env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s }
  cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking, env)
  chdir ||= "."
  o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s, chdir: chdir.to_s)
  s.success? ? o : raise(Error, e)
end

#can_use_drmaa?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


136
137
138
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 136

def can_use_drmaa?
  @can_use_drmaa
end

#delete(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qdel

Parameters:

  • job_id (#to_s)


157
158
159
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 157

def delete(job_id)
  call('qdel', job_id)
end

#get_all(owner: nil) ⇒ Array<OodCore::Job::Info>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get OodCore::Job::Info for every enqueued job, optionally filtering on owner

Parameters:

  • owner (#to_s) (defaults to: nil)

    the owner or owner list

Returns:



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 64

def get_all(owner: nil)
  listener = QstatXmlRListener.new
  argv = ['qstat', '-r', '-xml']
  argv.concat ['-u', owner] unless owner.nil?
  REXML::Parsers::StreamParser.new(call(*argv), listener).parse

  listener.parsed_jobs.map{
    |job_hash| OodCore::Job::Info.new(
      **post_process_qstat_job_hash(job_hash)
    )
  }
end

#get_info_enqueued_job(job_id) ⇒ OodCore::Job::Info

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get OodCore::Job::Info for a job_id that may still be in the queue

If libdrmaa is not loaded then we cannot use DRMAA. Using DRMAA provides better job status and should always be chosen if it is possible.

When qstat is called in XML mode for a job id that is not in the queue invalid XML is returned. The second line of the invalid XML contains the string ā€˜<unknown_jobsā€™ which will be used to recognize this case.

Parameters:

  • job_id (#to_s)

Returns:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 88

def get_info_enqueued_job(job_id)
  job_info = OodCore::Job::Info.new(id: job_id.to_s, status: :completed)
  argv = ['qstat', '-r', '-xml', '-j', job_id.to_s]

  begin
    results = call(*argv)
    listener = QstatXmlJRListener.new
    REXML::Parsers::StreamParser.new(results, listener).parse

    job_hash = listener.parsed_job

    if job_hash[:id]
      update_job_hash_status!(job_hash)
    else
      job_hash[:id] = job_id
      job_hash[:status] = :completed
    end

    job_info = OodCore::Job::Info.new(**job_hash)
  rescue REXML::ParseException => e
    # If the error is something other than a job not being found by qstat re-raise the error
    unless results =~ /unknown_jobs/
      raise e, "REXML::ParseException error and command '#{argv.join(' ')}' produced results that didn't contain string 'unknown_jobs'. ParseException: #{e.message}"
    end
  rescue StandardError => e
    # Note that DRMAA is not guaranteed to be defined, hence the tests
    raise e unless ( can_use_drmaa? && e.is_a?(DRMAA::DRMAAInvalidJobError))  # raised when job is not found
  end

  job_info
end

#get_status_from_drmaa?(job_hash) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


130
131
132
133
134
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 130

def get_status_from_drmaa?(job_hash)
  # DRMAA does not recognize the parent task in job arrays
  # e.g. 123 is invalid if it is an array job, while 123.4 is valid
  can_use_drmaa? && job_hash[:tasks].empty?
end

#get_status_from_drmma(job_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get the job status using DRMAA



235
236
237
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 235

def get_status_from_drmma(job_id)
  translate_drmaa_state(DRMAA::SessionSingleton.instance.job_ps(job_id.to_s))
end

#hold(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qhold

Parameters:

  • job_id (#to_s)


143
144
145
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 143

def hold(job_id)
  call('qhold', job_id)
end

#load_drmaa(libdrmaa_path) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



55
56
57
58
59
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 55

def load_drmaa(libdrmaa_path)
  FFI_DRMAA.libdrmaa_path = libdrmaa_path if libdrmaa_path
  require "ood_core/job/adapters/drmaa"
  require "ood_core/refinements/drmaa_extensions"
end

#post_process_qstat_job_hash(job_hash) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



222
223
224
225
226
227
228
229
230
231
232
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 222

def post_process_qstat_job_hash(job_hash)
  # dispatch is not set if the job is not running
  if ! job_hash.key?(:wallclock_time)
    job_hash[:wallclock_time] = job_hash.key?(:dispatch_time) ? Time.now.to_i - job_hash[:dispatch_time] : 0
  end

  job_hash[:status] = translate_sge_state(job_hash[:status])
  update_job_hash_status!(job_hash)

  job_hash
end

#release(job_id) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Call qrls

Parameters:

  • job_id (#to_s)


150
151
152
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 150

def release(job_id)
  call('qrls', job_id)
end

#submit(content, args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call qsub with arguments and the scripts content

Parameters:

  • job_id (#to_s)


164
165
166
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 164

def submit(content, args)
    @helper.parse_job_id_from_qsub(call('qsub', *args, :stdin => content))
end

#translate_drmaa_state(drmaa_state_code) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



218
219
220
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 218

def translate_drmaa_state(drmaa_state_code)
  DRMAA::DRMMA_TO_OOD_STATE_MAP.fetch(drmaa_state_code, :undetermined)
end

#translate_sge_state(sge_state_code) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



214
215
216
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 214

def translate_sge_state(sge_state_code)
  STATE_MAP.fetch(sge_state_code, :undetermined)
end

#update_job_hash_status!(job_hash) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



120
121
122
123
124
125
126
127
128
# File 'lib/ood_core/job/adapters/sge/batch.rb', line 120

def update_job_hash_status!(job_hash)
  if get_status_from_drmaa?(job_hash)
    begin
      job_hash[:status] = get_status_from_drmma(job_hash[:id])
    rescue DRMAA::DRMAAException => e
      # log DRMAA error?
    end
  end
end