Class: MiGA::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/miga/daemon.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(p) ⇒ Daemon

Returns a new instance of Daemon.



21
22
23
24
25
26
27
28
# File 'lib/miga/daemon.rb', line 21

def initialize(p)
	 @project = p
	 @runopts = JSON.parse(
	    File.read(File.expand_path("daemon/daemon.json", project.path)),
	    {:symbolize_names=>true})
	 @jobs_to_run = []
	 @jobs_running = []
end

Instance Attribute Details

#jobs_runningObject (readonly)

Returns the value of attribute jobs_running.



20
21
22
# File 'lib/miga/daemon.rb', line 20

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Returns the value of attribute jobs_to_run.



20
21
22
# File 'lib/miga/daemon.rb', line 20

def jobs_to_run
  @jobs_to_run
end

#optionsObject (readonly)

Returns the value of attribute options.



20
21
22
# File 'lib/miga/daemon.rb', line 20

def options
  @options
end

#projectObject (readonly)

Returns the value of attribute project.



20
21
22
# File 'lib/miga/daemon.rb', line 20

def project
  @project
end

Class Method Details

.last_alive(p) ⇒ Object



14
15
16
17
18
# File 'lib/miga/daemon.rb', line 14

def self.last_alive(p)
   f = File.expand_path("daemon/alive", p.path)
	 return nil unless File.size? f
	 DateTime.parse(File.read(f))
end

Instance Method Details

#daemon(task, opts = []) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/miga/daemon.rb', line 53

def daemon(task, opts=[])
	 options = default_options
	 opts.unshift(task)
	 options[:ARGV] = opts
	 Daemons.run_proc("MiGA:#{project.[:name]}", options) do
	    p = project
	    say "-----------------------------------"
	    say "MiGA:#{p.[:name]} launched."
	    say "-----------------------------------"
	    loop_i = 0
	    loop do
  # Tell the world you're alive
  f = File.open(File.expand_path("daemon/alive", project.path),"w")
  f.print Time.now.to_s
  f.close
  loop_i += 1
  # Traverse datasets
  p.datasets.each do |ds|
     # Inspect preprocessing
		  to_run = ds.next_preprocessing
		  # Launch task
		  queue_job(to_run, ds) unless to_run.nil?
  end
  
  # Check if all the reference datasets are pre-processed.
  # If yes, check the project-level tasks
  if p.done_preprocessing?
		  to_run = p.next_distances
		  to_run = p.next_inclade if to_run.nil?
		  # Launch task
		  queue_job(to_run) unless to_run.nil?
  end
  
  # Run jobs
  flush!

  # Every 12 loops:
  if loop_i==12
		  say "Housekeeping for sanity"
		  loop_i = 0
		  # Check if running jobs are alive
		  purge!
		  # Reload project metadata (to add newly created datasets)
		  project.load
  end
  sleep(latency)
	    end
	 end
end

#default_optionsObject



32
33
34
35
# File 'lib/miga/daemon.rb', line 32

def default_options
   { dir_mode: :normal, dir: File.expand_path("daemon", project.path),
	    multiple: false, log_output: true }
end

#flush!Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/miga/daemon.rb', line 138

def flush!
	 # Check for finished jobs
	 self.jobs_running.select! do |job|
	    r = job[:ds].nil? ?
  self.project.add_result(job[:job]) :
  job[:ds].add_result(job[:job])
	    say "Completed pid:#{job[:pid]} for " + 
  "#{job[:ds].nil? ? "" : "#{job[:ds].name}:"}#{job[:job]}" unless
  r.nil?
	    r.nil?
	 end
	 
	 # Avoid single datasets hogging resources
	 @jobs_to_run.rotate! rand(@jobs_to_run.size)
	 
	 # Launch as many @jobs_to_run as possible
	 while jobs_running.size < maxjobs
	    break if jobs_to_run.empty?
	    job = self.jobs_to_run.shift
	    if runopts(:type) == "bash"
  job[:pid] = spawn job[:cmd]
  Process.detach job[:pid]
	    else
  job[:pid] = `#{job[:cmd]}`.gsub(/[\n\r]/,"")
	    end
	    @jobs_running << job
	    say "Spawned pid:#{job[:pid]} for " +
  "#{job[:ds].nil? ? "" : "#{job[:ds].name}:"}#{job[:job]}"
	 end
end

#get_job(job, ds = nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/miga/daemon.rb', line 127

def get_job(job, ds=nil)
	 if ds==nil
	    (@jobs_to_run + @jobs_running).select do |j|
  (j[:ds].nil?) and (j[:job]==job)
	    end.first
	 else
	    (@jobs_to_run + @jobs_running).select do |j|
  (not j[:ds].nil?) and (j[:ds].name==ds.name) and (j[:job]==job)
	    end.first
	 end
end

#last_aliveObject



29
30
31
# File 'lib/miga/daemon.rb', line 29

def last_alive
	 Daemon.last_alive project
end

#latencyObject



46
# File 'lib/miga/daemon.rb', line 46

def latency() runopts(:latency) ; end

#maxjobsObject



47
# File 'lib/miga/daemon.rb', line 47

def maxjobs() runopts(:maxjobs) ; end

#ppnObject



48
# File 'lib/miga/daemon.rb', line 48

def ppn() runopts(:ppn) ; end

#purge!Object



168
169
170
171
172
# File 'lib/miga/daemon.rb', line 168

def purge!
	 self.jobs_running.select! do |job|
	    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
	 end
end

#queue_job(job, ds = nil) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/miga/daemon.rb', line 102

def queue_job(job, ds=nil)
	 return nil unless get_job(job, ds).nil?
	 ds_name = (ds.nil? ? "miga-project" : ds.name)
	 say "Queueing ", ds_name, ":#{job}"
	 type = runopts(:type)
	 vars = {
	    "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type), "CORES"=>ppn,
	    "MIGA"=>File.expand_path("../..", File.dirname(__FILE__)) }
	 vars["DATASET"] = ds.name unless ds.nil?
	 log_dir = File.expand_path("daemon/#{job}", project.path)
	 Dir.mkdir log_dir unless Dir.exist? log_dir
	 to_run = {ds: ds, job: job, cmd: sprintf(runopts(:cmd),
  # 1: script
  vars["MIGA"] + "/scripts/#{job.to_s}.bash",
  # 2: vars
  vars.keys.map{|k| sprintf(runopts(:var),k,vars[k])
		  }.join(runopts(:varsep)),
  # 3: CPUs
  ppn,
  # 4: log file
  File.expand_path("#{ds_name}.log", log_dir),
  # 5: task name
  "#{project.[:name][0..9]}:#{job}:#{ds_name}")}
	 @jobs_to_run << to_run
end

#restartObject



51
# File 'lib/miga/daemon.rb', line 51

def restart() daemon("restart") ; end

#runopts(k, v = nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/miga/daemon.rb', line 36

def runopts(k, v=nil)
	 k = k.to_sym
	 unless v.nil?
	    v = v.to_i if [:latency, :maxjobs, :ppn].include? k
	    raise "Daemon's #{k} cannot be set to zero." if
  v.is_a? Integer and v==0
	    @runopts[k] = v
	 end
	 @runopts[k]
end

#say(*opts) ⇒ Object



173
174
175
# File 'lib/miga/daemon.rb', line 173

def say(*opts)
	 print "[#{Time.new.inspect}] ", *opts, "\n"
end

#startObject



49
# File 'lib/miga/daemon.rb', line 49

def start() daemon("start") ; end

#statusObject



52
# File 'lib/miga/daemon.rb', line 52

def status() daemon("status") ; end

#stopObject



50
# File 'lib/miga/daemon.rb', line 50

def stop() daemon("stop") ; end