Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Includes:
Base
Defined in:
lib/miga/daemon.rb,
lib/miga/daemon/base.rb

Overview

MiGA Daemons handling job submissions.

Defined Under Namespace

Modules: Base

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#latency, #maxjobs, #ppn, #restart, #runopts, #shutdown_when_done?, #start, #status, #stop

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?

Methods included from Common::Path

#root_path, #script_path

Methods included from Common::Format

#clean_fasta_file, #seqs_length, #tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.



39
40
41
42
43
44
45
46
47
48
# File 'lib/miga/daemon.rb', line 39

def initialize(project)
  $_MIGA_DAEMON_LAIR << self
  @project = project
  @runopts = JSON.parse(
        File.read(File.expand_path('daemon/daemon.json', project.path)),
        symbolize_names: true)
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.



32
33
34
# File 'lib/miga/daemon.rb', line 32

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.



30
31
32
# File 'lib/miga/daemon.rb', line 30

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.



34
35
36
# File 'lib/miga/daemon.rb', line 34

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.



28
29
30
# File 'lib/miga/daemon.rb', line 28

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.



26
27
28
# File 'lib/miga/daemon.rb', line 26

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.



16
17
18
19
20
# File 'lib/miga/daemon.rb', line 16

def self.last_alive(project)
  f = File.expand_path('daemon/alive', project.path)
  return nil unless File.exist? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/miga/daemon.rb', line 114

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project"
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



129
130
131
132
133
134
135
# File 'lib/miga/daemon.rb', line 129

def check_project
  return if project.dataset_names.empty?
  return unless project.done_preprocessing?(false)
  to_run = project.next_distances(true)
  to_run = project.next_inclade(true) if to_run.nil?
  queue_job(to_run) unless to_run.nil?
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.



67
68
69
70
71
72
73
74
# File 'lib/miga/daemon.rb', line 67

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end

#declare_aliveObject

Tell the world that you’re alive.



78
79
80
81
82
# File 'lib/miga/daemon.rb', line 78

def declare_alive
  f = File.open(File.expand_path('daemon/alive', project.path), 'w')
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



59
60
61
62
# File 'lib/miga/daemon.rb', line 59

def default_options
  { dir_mode: :normal, dir: File.expand_path('daemon', project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/miga/daemon.rb', line 187

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    launch_job @jobs_to_run.shift
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



174
175
176
177
178
179
180
181
182
# File 'lib/miga/daemon.rb', line 174

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/miga/daemon.rb', line 213

def in_loop
  declare_alive
  project.load
  if loop_i == -1
    say '-----------------------------------'
    say 'MiGA:%s launched.' % project.name
    say '-----------------------------------'
    load_status
    @loop_i = 0
  end
  @loop_i += 1
  check_datasets
  check_project
  flush!
  if loop_i==4
    say 'Housekeeping for sanity'
    @loop_i = 0
    purge!
  end
  report_status
  sleep(latency)
  if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0
    say 'Nothing else to do, shutting down.'
    return false
  end
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.



53
54
55
# File 'lib/miga/daemon.rb', line 53

def last_alive
  MiGA::Daemon.last_alive project
end

#load_statusObject

Load the status of a previous instance.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/miga/daemon.rb', line 95

def load_status
  f_path = File.expand_path('daemon/status.json', project.path)
  return unless File.size? f_path
  say 'Loading previous status in daemon/status.json:'
  status = JSON.parse(File.read(f_path), symbolize_names: true)
  status.keys.each do |i|
    status[i].map! do |j|
      j.tap { |k| k[:ds] = project.dataset(k[:ds_name]) unless k[:ds].nil? }
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end

#purge!Object

Remove dead jobs.



205
206
207
208
209
# File 'lib/miga/daemon.rb', line 205

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/miga/daemon.rb', line 141

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  say 'Queueing %s:%s' % [ds_name, job]
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts(:type),
    'CORES'   => ppn,
    'MIGA'    => MiGA::MiGA.root_path
  }
  vars['DATASET'] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, ds_name: ds_name, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project),
      # 2: vars
      vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.
        join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#report_statusObject

Report status in a JSON file.



86
87
88
89
90
91
# File 'lib/miga/daemon.rb', line 86

def report_status
  f = File.open(File.expand_path('daemon/status.json', project.path), 'w')
  f.print JSON.pretty_generate(
    jobs_running: @jobs_running, jobs_to_run: @jobs_to_run)
  f.close
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



243
244
245
# File 'lib/miga/daemon.rb', line 243

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#terminateObject

Terminates a daemon.



249
250
251
252
253
254
255
256
257
258
259
# File 'lib/miga/daemon.rb', line 249

def terminate
  say 'Terminating daemon...'
  report_status
  k = runopts(:kill)
  @jobs_running.each do |i|
    `#{k % i[:pid]}`
    puts "Terminating pid:#{i[:pid]} for #{i[:task_name]}"
  end
  f = File.expand_path('daemon/alive', project.path)
  File.unlink(f) if File.exist? f
end