Class: MiGA::Daemon
- Includes:
- Base
- Defined in:
- lib/miga/daemon.rb,
lib/miga/daemon/base.rb
Overview
MiGA Daemons handling job submissions.
Defined Under Namespace
Modules: Base
Constant Summary
Constants included from MiGA
CITATION, VERSION, VERSION_DATE, VERSION_NAME
Instance Attribute Summary collapse
-
#jobs_running ⇒ Object
readonly
Array of jobs currently running.
-
#jobs_to_run ⇒ Object
readonly
Array of jobs next to be executed.
-
#loop_i ⇒ Object
readonly
Integer indicating the current iteration.
-
#options ⇒ Object
readonly
Options used to setup the daemon.
-
#project ⇒ Object
readonly
MiGA::Project in which the daemon is running.
Class Method Summary collapse
-
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project
project
was seen active? Returns DateTime.
Instance Method Summary collapse
-
#check_datasets ⇒ Object
Traverse datasets.
-
#check_project ⇒ Object
Check if all reference datasets are pre-processed.
-
#daemon(task, opts = []) ⇒ Object
Launches the
task
with optionsopts
(as command-line arguments). -
#declare_alive ⇒ Object
Tell the world that you’re alive.
-
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
-
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.
-
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol
job
in datasetds
. -
#in_loop ⇒ Object
Run one loop step.
-
#initialize(project) ⇒ Daemon
constructor
Initialize an unactive daemon for the MiGA::Project
project
. -
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns DateTime.
-
#load_status ⇒ Object
Load the status of a previous instance.
-
#purge! ⇒ Object
Remove dead jobs.
-
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key
job
. -
#report_status ⇒ Object
Report status in a JSON file.
-
#say(*opts) ⇒ Object
Send a datestamped message to the log.
-
#terminate ⇒ Object
Terminates a daemon.
Methods included from Base
#latency, #maxjobs, #ppn, #restart, #runopts, #shutdown_when_done?, #start, #status, #stop
Methods inherited from MiGA
CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?
Methods included from Common::Path
Methods included from Common::Format
#clean_fasta_file, #seqs_length, #tabulate
Constructor Details
#initialize(project) ⇒ Daemon
Initialize an unactive daemon for the MiGA::Project project
. See #daemon to wake the daemon.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/miga/daemon.rb', line 39 def initialize(project) $_MIGA_DAEMON_LAIR << self @project = project @runopts = JSON.parse( File.read(File.('daemon/daemon.json', project.path)), symbolize_names: true) @jobs_to_run = [] @jobs_running = [] @loop_i = -1 end |
Instance Attribute Details
#jobs_running ⇒ Object (readonly)
Array of jobs currently running.
32 33 34 |
# File 'lib/miga/daemon.rb', line 32 def jobs_running @jobs_running end |
#jobs_to_run ⇒ Object (readonly)
Array of jobs next to be executed.
30 31 32 |
# File 'lib/miga/daemon.rb', line 30 def jobs_to_run @jobs_to_run end |
#loop_i ⇒ Object (readonly)
Integer indicating the current iteration.
34 35 36 |
# File 'lib/miga/daemon.rb', line 34 def loop_i @loop_i end |
#options ⇒ Object (readonly)
Options used to setup the daemon.
28 29 30 |
# File 'lib/miga/daemon.rb', line 28 def @options end |
#project ⇒ Object (readonly)
MiGA::Project in which the daemon is running.
26 27 28 |
# File 'lib/miga/daemon.rb', line 26 def project @project end |
Class Method Details
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project project
was seen active? Returns DateTime.
16 17 18 19 20 |
# File 'lib/miga/daemon.rb', line 16 def self.last_alive(project) f = File.('daemon/alive', project.path) return nil unless File.exist? f DateTime.parse(File.read(f)) end |
Instance Method Details
#check_datasets ⇒ Object
Traverse datasets
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/miga/daemon.rb', line 114 def check_datasets project.each_dataset do |n, ds| if ds.nil? say "Warning: Dataset #{n} listed but not loaded, reloading project" project.load else to_run = ds.next_preprocessing(true) queue_job(to_run, ds) unless to_run.nil? end end end |
#check_project ⇒ Object
Check if all reference datasets are pre-processed. If yes, check the project-level tasks
129 130 131 132 133 134 135 |
# File 'lib/miga/daemon.rb', line 129 def check_project return if project.dataset_names.empty? return unless project.done_preprocessing?(false) to_run = project.next_distances(true) to_run = project.next_inclade(true) if to_run.nil? queue_job(to_run) unless to_run.nil? end |
#daemon(task, opts = []) ⇒ Object
Launches the task
with options opts
(as command-line arguments). Supported tasks include: start, stop, restart, status.
67 68 69 70 71 72 73 74 |
# File 'lib/miga/daemon.rb', line 67 def daemon(task, opts=[]) = opts.unshift(task) [:ARGV] = opts Daemons.run_proc("MiGA:#{project.name}", ) do loop { break unless in_loop } end end |
#declare_alive ⇒ Object
Tell the world that you’re alive.
78 79 80 81 82 |
# File 'lib/miga/daemon.rb', line 78 def declare_alive f = File.open(File.('daemon/alive', project.path), 'w') f.print Time.now.to_s f.close end |
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
59 60 61 62 |
# File 'lib/miga/daemon.rb', line 59 def { dir_mode: :normal, dir: File.('daemon', project.path), multiple: false, log_output: true } end |
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/miga/daemon.rb', line 187 def flush! # Check for finished jobs @jobs_running.select! do |job| r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false) say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil? r.nil? end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Launch as many +jobs_to_run+ as possible while jobs_running.size < maxjobs break if jobs_to_run.empty? launch_job @jobs_to_run.shift end end |
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol job
in dataset ds
. For project-wide tasks let ds
be nil.
174 175 176 177 178 179 180 181 182 |
# File 'lib/miga/daemon.rb', line 174 def get_job(job, ds=nil) (jobs_to_run + jobs_running).find do |j| if ds==nil j[:ds].nil? and j[:job]==job else (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job end end end |
#in_loop ⇒ Object
Run one loop step. Returns a Boolean indicating if the loop should continue.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/miga/daemon.rb', line 213 def in_loop declare_alive project.load if loop_i == -1 say '-----------------------------------' say 'MiGA:%s launched.' % project.name say '-----------------------------------' load_status @loop_i = 0 end @loop_i += 1 check_datasets check_project flush! if loop_i==4 say 'Housekeeping for sanity' @loop_i = 0 purge! end report_status sleep(latency) if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0 say 'Nothing else to do, shutting down.' return false end true end |
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns DateTime.
53 54 55 |
# File 'lib/miga/daemon.rb', line 53 def last_alive MiGA::Daemon.last_alive project end |
#load_status ⇒ Object
Load the status of a previous instance.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/miga/daemon.rb', line 95 def load_status f_path = File.('daemon/status.json', project.path) return unless File.size? f_path say 'Loading previous status in daemon/status.json:' status = JSON.parse(File.read(f_path), symbolize_names: true) status.keys.each do |i| status[i].map! do |j| j.tap { |k| k[:ds] = project.dataset(k[:ds_name]) unless k[:ds].nil? } end end @jobs_running = status[:jobs_running] @jobs_to_run = status[:jobs_to_run] purge! say "- jobs running: #{@jobs_running.size}" say "- jobs to run: #{@jobs_to_run.size}" end |
#purge! ⇒ Object
Remove dead jobs.
205 206 207 208 209 |
# File 'lib/miga/daemon.rb', line 205 def purge! @jobs_running.select! do |job| `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1 end end |
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key job
. If the task is dataset-specific, ds
specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/miga/daemon.rb', line 141 def queue_job(job, ds=nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? 'miga-project' : ds.name) say 'Queueing %s:%s' % [ds_name, job] vars = { 'PROJECT' => project.path, 'RUNTYPE' => runopts(:type), 'CORES' => ppn, 'MIGA' => MiGA::MiGA.root_path } vars['DATASET'] = ds.name unless ds.nil? log_dir = File.("daemon/#{job}", project.path) Dir.mkdir(log_dir) unless Dir.exist? log_dir task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}" to_run = {ds: ds, ds_name: ds_name, job: job, task_name: task_name, cmd: sprintf(runopts(:cmd), # 1: script MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project), # 2: vars vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }. join(runopts(:varsep)), # 3: CPUs ppn, # 4: log file File.("#{ds_name}.log", log_dir), # 5: task name task_name)} @jobs_to_run << to_run end |
#report_status ⇒ Object
Report status in a JSON file.
86 87 88 89 90 91 |
# File 'lib/miga/daemon.rb', line 86 def report_status f = File.open(File.('daemon/status.json', project.path), 'w') f.print JSON.pretty_generate( jobs_running: @jobs_running, jobs_to_run: @jobs_to_run) f.close end |
#say(*opts) ⇒ Object
Send a datestamped message to the log.
243 244 245 |
# File 'lib/miga/daemon.rb', line 243 def say(*opts) print "[#{Time.new.inspect}] ", *opts, "\n" end |
#terminate ⇒ Object
Terminates a daemon.
249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/miga/daemon.rb', line 249 def terminate say 'Terminating daemon...' report_status k = runopts(:kill) @jobs_running.each do |i| `#{k % i[:pid]}` puts "Terminating pid:#{i[:pid]} for #{i[:task_name]}" end f = File.('daemon/alive', project.path) File.unlink(f) if File.exist? f end |