Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Defined in:
lib/miga/daemon.rb

Overview

MiGA Daemons handling job submissions.

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, clean_fasta_file, initialized?, #result_files_exist?, root_path, script_path, seqs_length, tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.



39
40
41
42
43
44
45
46
47
48
# File 'lib/miga/daemon.rb', line 39

def initialize(project)
  $_MIGA_DAEMON_LAIR << self
  @project = project
  @runopts = JSON.parse(
        File.read(File.expand_path('daemon/daemon.json', project.path)),
        symbolize_names: true)
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.



32
33
34
# File 'lib/miga/daemon.rb', line 32

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.



30
31
32
# File 'lib/miga/daemon.rb', line 30

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.



34
35
36
# File 'lib/miga/daemon.rb', line 34

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.



28
29
30
# File 'lib/miga/daemon.rb', line 28

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.



26
27
28
# File 'lib/miga/daemon.rb', line 26

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.



15
16
17
18
19
# File 'lib/miga/daemon.rb', line 15

def self.last_alive(project)
  f = File.expand_path('daemon/alive', project.path)
  return nil unless File.exist? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/miga/daemon.rb', line 153

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project."
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



168
169
170
171
172
173
174
# File 'lib/miga/daemon.rb', line 168

def check_project
  return if project.dataset_names.empty?
  return unless project.done_preprocessing?(false)
  to_run = project.next_distances(true)
  to_run = project.next_inclade(true) if to_run.nil?
  queue_job(to_run) unless to_run.nil?
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.



125
126
127
128
129
130
131
132
# File 'lib/miga/daemon.rb', line 125

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end

#declare_aliveObject

Tell the world that you’re alive.



136
137
138
139
140
# File 'lib/miga/daemon.rb', line 136

def declare_alive
  f = File.open(File.expand_path('daemon/alive', project.path), 'w')
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



59
60
61
62
# File 'lib/miga/daemon.rb', line 59

def default_options
  { dir_mode: :normal, dir: File.expand_path('daemon', project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/miga/daemon.rb', line 226

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    launch_job @jobs_to_run.shift
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



213
214
215
216
217
218
219
220
221
# File 'lib/miga/daemon.rb', line 213

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/miga/daemon.rb', line 252

def in_loop
  if loop_i == -1
    say '-----------------------------------'
    say 'MiGA:%s launched.' % project.name
    say '-----------------------------------'
    @loop_i = 0
  end
  @loop_i += 1
  declare_alive
  project.load
  check_datasets
  check_project
  flush!
  if loop_i==4
    say 'Housekeeping for sanity'
    @loop_i = 0
    purge!
  end
  report_status
  sleep(latency)
  if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0
    say 'Nothing else to do, shutting down.'
    return false
  end
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.



53
54
55
# File 'lib/miga/daemon.rb', line 53

def last_alive
  MiGA::Daemon.last_alive project
end

#latencyObject

Returns Integer indicating the number of seconds to sleep between checks.



91
# File 'lib/miga/daemon.rb', line 91

def latency() runopts(:latency); end

#maxjobsObject

Returns Integer indicating the maximum number of concurrent jobs to run.



95
# File 'lib/miga/daemon.rb', line 95

def maxjobs() runopts(:maxjobs); end

#ppnObject

Returns Integer indicating the number of CPUs per job.



99
# File 'lib/miga/daemon.rb', line 99

def ppn() runopts(:ppn); end

#purge!Object

Remove dead jobs.



244
245
246
247
248
# File 'lib/miga/daemon.rb', line 244

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/miga/daemon.rb', line 180

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  say 'Queueing %s:%s' % [ds_name, job]
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts(:type),
    'CORES'   => ppn,
    'MIGA'    => MiGA::MiGA.root_path
  }
  vars['DATASET'] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, ds_name: ds_name, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project),
      # 2: vars
      vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.
        join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#report_statusObject

Report status in a JSON file.



144
145
146
147
148
149
# File 'lib/miga/daemon.rb', line 144

def report_status
  f = File.open(File.expand_path('daemon/status.json', project.path), 'w')
  f.print JSON.pretty_generate(
    jobs_running:@jobs_running, jobs_to_run:@jobs_to_run)
  f.close
end

#restart(opts = []) ⇒ Object

Restarts the daemon with opts.



116
# File 'lib/miga/daemon.rb', line 116

def restart(opts=[]) daemon('restart', opts); end

#runopts(k, v = nil, force = false) ⇒ Object

Set/get #options, where k is the Symbol of the option and v is the value (or nil to use as getter). Skips consistency tests if force. Returns new value.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/miga/daemon.rb', line 68

def runopts(k, v=nil, force=false)
  k = k.to_sym
  unless v.nil?
    if [:latency, :maxjobs, :ppn].include?(k)
      v = v.to_i
    elsif [:shutdown_when_done].include?(k)
      v = !!v
    end
    raise "Daemon's #{k} cannot be set to zero." if !force and v==0
    @runopts[k] = v
  end
  if k==:kill and v.nil?
    case @runopts[:type].to_s
    when 'bash' then return "kill -9 '%s'"
    when 'qsub' then return "qdel '%s'"
    else             return "canceljob '%s'"
    end
  end
  @runopts[k]
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



281
282
283
# File 'lib/miga/daemon.rb', line 281

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#shutdown_when_done?Boolean

Returns Boolean indicating if the daemon should shutdown when processing is complete.

Returns:

  • (Boolean)


104
# File 'lib/miga/daemon.rb', line 104

def shutdown_when_done?() !!runopts(:shutdown_when_done); end

#start(opts = []) ⇒ Object

Initializes the daemon with opts.



108
# File 'lib/miga/daemon.rb', line 108

def start(opts=[]) daemon('start', opts); end

#status(opts = []) ⇒ Object

Returns the status of the daemon with opts.



120
# File 'lib/miga/daemon.rb', line 120

def status(opts=[]) daemon('status', opts); end

#stop(opts = []) ⇒ Object

Stops the daemon with opts.



112
# File 'lib/miga/daemon.rb', line 112

def stop(opts=[]) daemon('stop', opts); end

#terminateObject

Terminates a daemon.



287
288
289
290
291
292
293
294
295
296
297
# File 'lib/miga/daemon.rb', line 287

def terminate
  say 'Terminating daemon...'
  report_status
  k = runopts(:kill)
  @jobs_running.each do |i|
    `#{k % i[:pid]}`
    puts "Terminating pid:#{i[:pid]} for #{i[:task_name]}"
  end
  f = File.expand_path('daemon/alive', project.path)
  File.unlink(f) if File.exist? f
end