Class: Ripe::WorkerController::Syncer
- Inherits:
-
Object
- Object
- Ripe::WorkerController::Syncer
- Defined in:
- lib/ripe/worker_controller/syncer.rb
Overview
This class controls worker syncing with the compute cluster queue.
Instance Attribute Summary collapse
-
#completed_jobs ⇒ Array<DB::Worker>
readonly
a list of completed workers.
-
#running_jobs ⇒ Array<Hash<Symbol, String>>
readonly
a list of running jobs as well as certain parameters (
moab_id
,time
and +status). -
#workers ⇒ Array<DB::Worker>
readonly
list of workers that have been updated (or completed).
Instance Method Summary collapse
-
#fetch_completed_jobs ⇒ void
Fetch a list of completed workers from the running jobs: these are jobs that were previously marked as active, blocked or idle that can no be found on the compute cluster queue.
-
#fetch_running_jobs ⇒ void
Fetch status for all running jobs.
-
#initialize ⇒ Syncer
constructor
Synchronize the status of jobs with the internal list of workers.
-
#update_completed_workers ⇒ void
Update the status of completed workers from the running jobs.
-
#update_running_workers ⇒ void
Update the status of running workers from the running jobs.
Constructor Details
#initialize ⇒ Syncer
Synchronize the status of jobs with the internal list of workers.
24 25 26 27 28 29 30 31 |
# File 'lib/ripe/worker_controller/syncer.rb', line 24 def initialize @workers = [] fetch_running_jobs update_running_workers fetch_completed_jobs update_completed_workers end |
Instance Attribute Details
#completed_jobs ⇒ Array<DB::Worker> (readonly)
a list of completed workers
17 18 19 |
# File 'lib/ripe/worker_controller/syncer.rb', line 17 def completed_jobs @completed_jobs end |
#running_jobs ⇒ Array<Hash<Symbol, String>> (readonly)
a list of running jobs as well as certain parameters (moab_id
, time
and +status).
17 18 19 |
# File 'lib/ripe/worker_controller/syncer.rb', line 17 def running_jobs @running_jobs end |
#workers ⇒ Array<DB::Worker> (readonly)
list of workers that have been updated (or completed)
17 18 19 |
# File 'lib/ripe/worker_controller/syncer.rb', line 17 def workers @workers end |
Instance Method Details
#fetch_completed_jobs ⇒ void
This method returns an undefined value.
Fetch a list of completed workers from the running jobs: these are jobs that were previously marked as active, blocked or idle that can no be found on the compute cluster queue.
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ripe/worker_controller/syncer.rb', line 85 def fetch_completed_jobs running_job_ids = @running_jobs.map { |job| job[:moab_id] } running_workers = DB::Worker.where('status in (:statuses)', :statuses => ['active', 'idle', 'blocked']) @completed_workers = running_workers.select do |worker| !running_job_ids.include?(worker.moab_id) && !worker.cancelled? && worker.user == `whoami`.chomp end end |
#fetch_running_jobs ⇒ void
This method returns an undefined value.
Fetch status for all running jobs.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/ripe/worker_controller/syncer.rb', line 38 def fetch_running_jobs lists = {idle: '-i', blocked: '-b', active: '-r'} lists = lists.map do |status, op| showq = `showq -u $(whoami) #{op} | grep $(whoami)`.split("\n") showq.map do |job| { moab_id: job[/^([0-9]+) /, 1], time: job[/ ([0-9]{1,2}(\:[0-9]{2})+) /, 1], status: status, } end end @running_jobs = lists.inject(&:+) end |
#update_completed_workers ⇒ void
This method returns an undefined value.
Update the status of completed workers from the running jobs.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ripe/worker_controller/syncer.rb', line 102 def update_completed_workers @workers += @completed_workers.map do |worker| stdout = (File.exists?(worker.stdout)) ? File.new(worker.stdout).readlines.join : "" worker.update({ cpu_used: stdout[/Resources:[ \t]*cput=([0-9]{1,2}(\:[0-9]{2})+),/, 1], exit_code: stdout[/Exit code:[ \t]*(.*)$/, 1], host: stdout[/Nodes:[ \t]*(.*)$/, 1], memory_used: stdout[/Resources:.*,mem=([0-9]*[a-zA-Z]*),/, 1], time: stdout[/Resources:.*,walltime=([0-9]{1,2}(\:[0-9]{2})+)$/, 1], status: :completed, }) worker end end |
#update_running_workers ⇒ void
This method returns an undefined value.
Update the status of running workers from the running jobs.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/ripe/worker_controller/syncer.rb', line 58 def update_running_workers workers = @running_jobs.map do |job| worker = DB::Worker.find_by(moab_id: job[:moab_id]) if worker worker.update(time: job[:time]) unless ['cancelled', job[:status]].include?(worker.status) checkjob = `checkjob #{job[:moab_id]}` worker.update({ host: checkjob[/Allocated Nodes:\n\[(.*):[0-9]+\]\n/, 1], # Queued jobs that appear become either idle, blocked or active status: job[:status], }) end end worker # This is +nil+ if worker is not found (other running jobs # that are independent from the current ripe repo). end @workers += workers.compact end |