Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Defined in:
lib/miga/daemon.rb

Overview

MiGA Daemons handling job submissions.

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, clean_fasta_file, initialized?, #result_files_exist?, root_path, tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.



35
36
37
38
39
40
41
42
43
# File 'lib/miga/daemon.rb', line 35

def initialize(project)
  @project = project
  @runopts = JSON.parse(
    File.read(File.expand_path("daemon/daemon.json", project.path)),
      {:symbolize_names=>true})
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.



28
29
30
# File 'lib/miga/daemon.rb', line 28

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.



26
27
28
# File 'lib/miga/daemon.rb', line 26

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.



30
31
32
# File 'lib/miga/daemon.rb', line 30

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.



24
25
26
# File 'lib/miga/daemon.rb', line 24

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.



22
23
24
# File 'lib/miga/daemon.rb', line 22

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.



15
16
17
18
19
# File 'lib/miga/daemon.rb', line 15

def self.last_alive(project)
  f = File.expand_path("daemon/alive", project.path)
  return nil unless File.size? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/miga/daemon.rb', line 130

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project."
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



145
146
147
148
149
150
151
152
# File 'lib/miga/daemon.rb', line 145

def check_project
  return if project.dataset_names.empty?
  if project.done_preprocessing?(false)
    to_run = project.next_distances(true)
    to_run = project.next_inclade(true) if to_run.nil?
    queue_job(to_run) unless to_run.nil?
  end
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.



111
112
113
114
115
116
117
118
# File 'lib/miga/daemon.rb', line 111

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end

#declare_aliveObject

Tell the world that you’re alive



122
123
124
125
126
# File 'lib/miga/daemon.rb', line 122

def declare_alive
  f = File.open(File.expand_path("daemon/alive", project.path), "w")
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



54
55
56
57
# File 'lib/miga/daemon.rb', line 54

def default_options
  { dir_mode: :normal, dir: File.expand_path("daemon", project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/miga/daemon.rb', line 200

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    job = @jobs_to_run.shift
    # Launch job
    if runopts(:type) == "bash"
      job[:pid] = spawn job[:cmd]
      Process.detach job[:pid] unless [nil, "", 0].include? job[:pid]
    else
      job[:pid] = `#{job[:cmd]}`.chomp
    end
    # Check if registered
    if [nil, "", 0].include? job[:pid].nil?
      job[:pid] = nil
      @jobs_to_run << job
      say "Unsuccessful #{job[:task_name]}, rescheduling."
    else
      @jobs_running << job
      say "Spawned pid:#{job[:pid]} for #{job[:task_name]}."
    end
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



187
188
189
190
191
192
193
194
195
# File 'lib/miga/daemon.rb', line 187

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/miga/daemon.rb', line 242

def in_loop
  if loop_i == -1
    say "-----------------------------------"
    say "MiGA:#{project.name} launched."
    say "-----------------------------------"
    @loop_i = 0
  end
  @loop_i += 1
  declare_alive
  project.load
  check_datasets
  check_project
  flush!
  if loop_i==4
    say "Housekeeping for sanity"
    @loop_i = 0
    purge!
  end
  sleep(latency)
  if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0
    return false
  end
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.



48
49
50
# File 'lib/miga/daemon.rb', line 48

def last_alive
  MiGA::Daemon.last_alive project
end

#latencyObject

Returns Integer indicating the number of seconds to sleep between checks.



77
# File 'lib/miga/daemon.rb', line 77

def latency() runopts(:latency) ; end

#maxjobsObject

Returns Integer indicating the maximum number of concurrent jobs to run.



81
# File 'lib/miga/daemon.rb', line 81

def maxjobs() runopts(:maxjobs) ; end

#ppnObject

Returns Integer indicating the number of CPUs per job.



85
# File 'lib/miga/daemon.rb', line 85

def ppn() runopts(:ppn) ; end

#purge!Object

Remove dead jobs.



234
235
236
237
238
# File 'lib/miga/daemon.rb', line 234

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/miga/daemon.rb', line 158

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? "miga-project" : ds.name)
  say "Queueing ", ds_name, ":#{job}"
  vars = { "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type),
    "CORES"=>ppn, "MIGA"=>MiGA::MiGA.root_path }
  vars["DATASET"] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      File.expand_path("scripts/#{job}.bash", vars["MIGA"]),
      # 2: vars
      vars.keys.map { |k|
 sprintf(runopts(:var), k, vars[k]) }.join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#restart(opts = []) ⇒ Object

Restarts the daemon with opts.



102
# File 'lib/miga/daemon.rb', line 102

def restart(opts=[]) daemon("restart", opts) ; end

#runopts(k, v = nil, force = false) ⇒ Object

Set/get #options, where k is the Symbol of the option and v is the value (or nil to use as getter). Skips consistency tests if force. Returns new value.



63
64
65
66
67
68
69
70
71
72
73
# File 'lib/miga/daemon.rb', line 63

def runopts(k, v=nil, force=false)
  k = k.to_sym
  unless v.nil?
    v = v.to_i if [:latency, :maxjobs, :ppn].include? k
    raise "Daemon's #{k} cannot be set to zero." if
      !force and v.is_a? Integer and v==0
    v = !!v if [:shutdown_when_done].include? k
    @runopts[k] = v
  end
  @runopts[k]
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



269
270
271
# File 'lib/miga/daemon.rb', line 269

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#shutdown_when_done?Boolean

Returns Boolean indicating if the daemon should shutdown when processing is complete.

Returns:

  • (Boolean)


90
# File 'lib/miga/daemon.rb', line 90

def shutdown_when_done?() !!runopts(:shutdown_when_done) ; end

#start(opts = []) ⇒ Object

Initializes the daemon with opts.



94
# File 'lib/miga/daemon.rb', line 94

def start(opts=[]) daemon("start", opts) ; end

#status(opts = []) ⇒ Object

Returns the status of the daemon with opts.



106
# File 'lib/miga/daemon.rb', line 106

def status(opts=[]) daemon("status", opts) ; end

#stop(opts = []) ⇒ Object

Stops the daemon with opts.



98
# File 'lib/miga/daemon.rb', line 98

def stop(opts=[]) daemon("stop", opts) ; end