Class: Ripe::WorkerController::Syncer

Inherits:
Object
  • Object
show all
Defined in:
lib/ripe/worker_controller/syncer.rb

Overview

This class controls worker syncing with the compute cluster queue.

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSyncer

Synchronize the status of jobs with the internal list of workers.



24
25
26
27
28
29
30
31
# File 'lib/ripe/worker_controller/syncer.rb', line 24

def initialize
  @workers = []

  fetch_running_jobs
  update_running_workers
  fetch_completed_jobs
  update_completed_workers
end

Instance Attribute Details

#completed_jobsArray<DB::Worker> (readonly)

a list of completed workers

Returns:

  • (Array<DB::Worker>)

    the current value of completed_jobs



17
18
19
# File 'lib/ripe/worker_controller/syncer.rb', line 17

def completed_jobs
  @completed_jobs
end

#running_jobsArray<Hash<Symbol, String>> (readonly)

a list of running jobs as well as certain parameters (moab_id, time and +status).

Returns:

  • (Array<Hash<Symbol, String>>)

    the current value of running_jobs



17
18
19
# File 'lib/ripe/worker_controller/syncer.rb', line 17

def running_jobs
  @running_jobs
end

#workersArray<DB::Worker> (readonly)

list of workers that have been updated (or completed)

Returns:

  • (Array<DB::Worker>)

    the current value of workers



17
18
19
# File 'lib/ripe/worker_controller/syncer.rb', line 17

def workers
  @workers
end

Instance Method Details

#fetch_completed_jobsvoid

This method returns an undefined value.

Fetch a list of completed workers from the running jobs: these are jobs that were previously marked as active, blocked or idle that can no be found on the compute cluster queue.



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/ripe/worker_controller/syncer.rb', line 85

def fetch_completed_jobs
  running_job_ids = @running_jobs.map { |job| job[:moab_id] }

  running_workers = DB::Worker.where('status in (:statuses)',
                                     :statuses => ['active', 'idle', 'blocked'])

  @completed_workers = running_workers.select do |worker|
    !running_job_ids.include?(worker.moab_id) &&
      !worker.cancelled? && worker.user == `whoami`.chomp
  end
end

#fetch_running_jobsvoid

This method returns an undefined value.

Fetch status for all running jobs.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/ripe/worker_controller/syncer.rb', line 38

def fetch_running_jobs
  lists = {idle: '-i', blocked: '-b', active:  '-r'}
  lists = lists.map do |status, op|
    showq = `showq -u $(whoami) #{op} | grep $(whoami)`.split("\n")
    showq.map do |job|
      {
        moab_id:   job[/^([0-9]+) /, 1],
        time:      job[/  ([0-9]{1,2}(\:[0-9]{2})+)  /, 1],
        status:    status,
      }
    end
  end
  @running_jobs = lists.inject(&:+)
end

#update_completed_workersvoid

This method returns an undefined value.

Update the status of completed workers from the running jobs.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ripe/worker_controller/syncer.rb', line 102

def update_completed_workers
  @workers += @completed_workers.map do |worker|
    stdout = (File.exists?(worker.stdout)) ? File.new(worker.stdout).readlines.join : ""
    worker.update({
      cpu_used:    stdout[/Resources:[ \t]*cput=([0-9]{1,2}(\:[0-9]{2})+),/, 1],
      exit_code:   stdout[/Exit code:[ \t]*(.*)$/, 1],
      host:        stdout[/Nodes:[ \t]*(.*)$/, 1],
      memory_used: stdout[/Resources:.*,mem=([0-9]*[a-zA-Z]*),/, 1],
      time:        stdout[/Resources:.*,walltime=([0-9]{1,2}(\:[0-9]{2})+)$/, 1],
      status:      :completed,
    })
    worker
  end
end

#update_running_workersvoid

This method returns an undefined value.

Update the status of running workers from the running jobs.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/ripe/worker_controller/syncer.rb', line 58

def update_running_workers
  workers = @running_jobs.map do |job|
    worker = DB::Worker.find_by(moab_id: job[:moab_id])
    if worker
      worker.update(time: job[:time])
      unless ['cancelled', job[:status]].include?(worker.status)
        checkjob = `checkjob #{job[:moab_id]}`
        worker.update({
          host:      checkjob[/Allocated Nodes:\n\[(.*):[0-9]+\]\n/, 1],
          # Queued jobs that appear become either idle, blocked or active
          status:    job[:status],
        })
      end
    end
    worker # This is +nil+ if worker is not found (other running jobs
           # that are independent from the current ripe repo).
  end
  @workers += workers.compact
end