Module: SimpleWS::Jobs::Scheduler

Includes:
Process
Defined in:
lib/simplews/jobs.rb

Overview

{{{ Scheduler

Defined Under Namespace

Classes: Job

Constant Summary collapse

@@task_results =
{}
@@names =
[]
@@pids =
{}
@@queue =
[]
@@max_jobs =
3

Class Method Summary collapse

Class Method Details

.abort(name) ⇒ Object


169
170
171
# File 'lib/simplews/jobs.rb', line 169

def self.abort(name)
  Process.kill("INT", @@pids[name]) if @@pids[name]
end

.abort_jobsObject


173
174
175
176
177
# File 'lib/simplews/jobs.rb', line 173

def self.abort_jobs
  @@pids.values{|pid|
    Process.kill "INT", pid
  }
end

.clean_job(pid) ⇒ Object


127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/simplews/jobs.rb', line 127

def self.clean_job(pid)
  name = @@pids.select{|name, p| p == pid}.first
  return if name.nil?
  name = name.first
  puts "Job #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}"
  state = Job.job_info(name)
  if ![:error, :done, :aborted].include?(state[:status])
    state[:status] = :error
    state[:messages] << "Job finished for unknown reasons"
    Job.save(name, state)
  end
  @@pids.delete(name)
end

.configure(name, value) ⇒ Object


80
81
82
# File 'lib/simplews/jobs.rb', line 80

def self.configure(name, value)
  Job::configure(name, value)
end

.dequeueObject


93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/simplews/jobs.rb', line 93

def self.dequeue
  if @@pids.length <  @@max_jobs && @@queue.any?
    job_info = @@queue.pop

    pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args])
    
    @@pids[job_info[:name]] = pid
    pid
  else
    nil
  end
end

.helper(name, block) ⇒ Object


84
85
86
# File 'lib/simplews/jobs.rb', line 84

def self.helper(name, block)
  Job.send :define_method, name, block
end

.job_info(name) ⇒ Object


179
180
181
# File 'lib/simplews/jobs.rb', line 179

def self.job_info(name)
  Job.job_info(name)
end

.job_monitorObject


141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/simplews/jobs.rb', line 141

def self.job_monitor
  Thread.new{
    while true
      begin
        pid = dequeue
        if pid.nil?
          if @@pids.any?
            pid_exit = Process.wait(-1, Process::WNOHANG)
            if pid_exit
              clean_job(pid_exit)
            else
              sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
            end
          else
            sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
          end
        else
          sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
        end
      rescue
        puts $!.message
        puts $!.backtrace.join("\n")
        sleep 2
      end
    end
  }
end

.job_results(name) ⇒ Object


187
188
189
# File 'lib/simplews/jobs.rb', line 187

def self.job_results(name)
  Job.results(name)   
end

.make_name(name = "") ⇒ Object


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/simplews/jobs.rb', line 56

def self.make_name(name = "")
  name = Scheduler::random_name("job-") unless name =~ /\w/

  taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(?:-\d+)?$/}
  taken += Job.taken(name)
  taken = taken.sort.uniq
  if taken.any?
    if taken.length == 1
      return name + '-2'
    else
      last = taken.collect{|s| 
        if s.match(/-(\d+)$/)
          $1.to_i 
        else 
          1 
        end
      }.sort.last
      return name + '-' + (last + 1).to_s
    end
  else
    return name
  end
end

.queueObject


106
107
108
# File 'lib/simplews/jobs.rb', line 106

def self.queue
  @@queue
end

.queue_size(size) ⇒ Object


37
38
39
# File 'lib/simplews/jobs.rb', line 37

def self.queue_size(size)
  @@max_jobs = size
end

.random_name(s = "", num = 20) ⇒ Object


42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/simplews/jobs.rb', line 42

def self.random_name(s="", num=20)
  num.times{
    r = rand
    if r < 0.3
      s << (rand * 10).to_i.to_s
    elsif r < 0.6
      s << (rand * (?z - ?a) + ?a).to_i.chr
    else 
      s << (rand * (?Z - ?A) + ?A).to_i.chr
    end
  }
  s.to_s
end

.run(task, *args) ⇒ Object


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/simplews/jobs.rb', line 110

def self.run(task, *args)
  suggested_name = *args.pop
  name = make_name(suggested_name)
  @@names << name

  @@queue.push( {:name => name, :task => task, :args => args})
  state = {
      :name => name, 
      :status => :queued, 
      :messages => [], 
      :info => {}, 
  }
  Job.save(name,state)
 
  name
end

.task(name, results, block) ⇒ Object


88
89
90
91
# File 'lib/simplews/jobs.rb', line 88

def self.task(name, results, block)
  @@task_results[name] = results
  Job.send :define_method, name, block
end

.workdir=(workdir) ⇒ Object


183
184
185
# File 'lib/simplews/jobs.rb', line 183

def self.workdir=(workdir)
  Job.workdir = workdir
end