Class: BaseChip::Tasker
Constant Summary collapse
- CLEAR =
"\e[0K"
- @@last_client_id =
0
Instance Attribute Summary collapse
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#client_of ⇒ Object
Returns the value of attribute client_of.
-
#clusters ⇒ Object
Returns the value of attribute clusters.
-
#complete_tasks ⇒ Object
Returns the value of attribute complete_tasks.
-
#foreground ⇒ Object
Returns the value of attribute foreground.
-
#pending_tasks ⇒ Object
Returns the value of attribute pending_tasks.
-
#running_tasks ⇒ Object
Returns the value of attribute running_tasks.
-
#task_master ⇒ Object
Returns the value of attribute task_master.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
- #available_slots ⇒ Object
- #clear_line ⇒ Object
- #finish ⇒ Object
- #get_task(client_id) ⇒ Object
-
#initialize(*args) ⇒ Tasker
constructor
A new instance of Tasker.
- #inner_register_results(t, result, *additional) ⇒ Object
- #kill ⇒ Object
- #kill_message ⇒ Object
- #maintain_workers ⇒ Object
- #new_client_id ⇒ Object
- #ready ⇒ Object
- #register_results(t, result, *additional) ⇒ Object
- #run ⇒ Object
- #spawn_worker(command) ⇒ Object
- #status_line ⇒ Object
- #submit(task) ⇒ Object
- #submit_name(t_name, worker_command = nil) ⇒ Object
- #submit_workload_chain(chain) ⇒ Object
- #tasks_total ⇒ Object
- #workers_alive ⇒ Object
- #workers_running ⇒ Object
- #workers_spawned ⇒ Object
- #workers_stopped ⇒ Object
Methods included from Ipc
Constructor Details
#initialize(*args) ⇒ Tasker
Returns a new instance of Tasker.
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/base_chip/tasker.rb', line 41 def initialize(*args) @workers = {} @pending_tasks = []; @tasks_pending = 0 @running_tasks = []; @tasks_running = 0 @complete_tasks = []; @tasks_complete = 0 @tasks_passed = 0 @tasks_failed = 0 @mutex = Mutex.new @server = self end |
Instance Attribute Details
#client_id ⇒ Object
Returns the value of attribute client_id.
39 40 41 |
# File 'lib/base_chip/tasker.rb', line 39 def client_id @client_id end |
#client_of ⇒ Object
Returns the value of attribute client_of.
38 39 40 |
# File 'lib/base_chip/tasker.rb', line 38 def client_of @client_of end |
#clusters ⇒ Object
Returns the value of attribute clusters.
30 31 32 |
# File 'lib/base_chip/tasker.rb', line 30 def clusters @clusters end |
#complete_tasks ⇒ Object
Returns the value of attribute complete_tasks.
36 37 38 |
# File 'lib/base_chip/tasker.rb', line 36 def complete_tasks @complete_tasks end |
#foreground ⇒ Object
Returns the value of attribute foreground.
28 29 30 |
# File 'lib/base_chip/tasker.rb', line 28 def foreground @foreground end |
#pending_tasks ⇒ Object
Returns the value of attribute pending_tasks.
34 35 36 |
# File 'lib/base_chip/tasker.rb', line 34 def pending_tasks @pending_tasks end |
#running_tasks ⇒ Object
Returns the value of attribute running_tasks.
35 36 37 |
# File 'lib/base_chip/tasker.rb', line 35 def running_tasks @running_tasks end |
#task_master ⇒ Object
Returns the value of attribute task_master.
26 27 28 |
# File 'lib/base_chip/tasker.rb', line 26 def task_master @task_master end |
#workers ⇒ Object
Returns the value of attribute workers.
32 33 34 |
# File 'lib/base_chip/tasker.rb', line 32 def workers @workers end |
Instance Method Details
#available_slots ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/base_chip/tasker.rb', line 122 def available_slots count = 0 @clusters.each do |c| count += c.available_slots end count end |
#clear_line ⇒ Object
256 257 258 |
# File 'lib/base_chip/tasker.rb', line 256 def clear_line print "#{Tasker::CLEAR}\r" end |
#finish ⇒ Object
222 223 224 225 226 227 228 229 |
# File 'lib/base_chip/tasker.rb', line 222 def finish return if @finishing @finishing = true kill @task_master.tasker_finish DRb.stop_service end |
#get_task(client_id) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/base_chip/tasker.rb', line 183 def get_task(client_id) @mutex.lock ready = maintain_workers w = @workers[client_id] raise "Could not find worker based on unique id #{client_id}" unless w t = nil ready.each do |rt| if rt.worker_command.nil? || rt.worker_command.call(0) == w[:worker_command] t = rt break end end if t @pending_tasks.delete t @running_tasks << t @tasks_pending -= 1 @tasks_running += 1 w[:tasks] << t w[:state] = :running else w[:state] = :stopped w[:cluster].slots_used -= 1 if w[:cluster] end finish if @pending_tasks.empty? && @running_tasks.empty? && ready.empty? if @pending_tasks.size > 0 && @running_tasks.empty? && ready.empty? raise("Could not build tasks because their dependencies never cleared: " + (@pending_tasks.map{|t|t.task_name}.join(" "))) end status_line @mutex.unlock t end |
#inner_register_results(t, result, *additional) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/base_chip/tasker.rb', line 89 def inner_register_results(t, result, *additional) return if @complete_tasks.select {|t2| t2.task_name == t.task_name}[0] original_task = (@pending_tasks.select {|t2| t2.task_name == t.task_name}[0]) original_task ||= (@running_tasks.select {|t2| t2.task_name == t.task_name}[0]) raise "INTERNAL ERROR: couldn't find #{t.task_name}" unless original_task if result == :cancel @tasks_pending -= 1; @pending_tasks.delete original_task else @tasks_running -= 1; @running_tasks.delete original_task end @complete_tasks << t @tasks_complete += 1 if result == 'pass' @tasks_passed += 1 original_task.next_tasks.each { |nt| nt.wait_count -= 1 } else original_task.next_tasks.each { |nt| inner_register_results(nt,:cancel) } @tasks_failed += 1 unless result == :cancel end @task_master.tasker_handle_results(t, result, *additional) status_line finish if (BaseChip..max_fails && @tasks_failed >= BaseChip..max_fails ) || (BaseChip..max_passes && @tasks_passed >= BaseChip..max_passes) end |
#kill ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/base_chip/tasker.rb', line 231 def kill locked = @mutex.try_lock @workers.each do |id,w| next if w[:stopped] if w[:cluster] && (w[:state] != :stopped) w[:cluster].kill(w[:uid]) end end (@running_tasks + @pending_tasks).each do |t| inner_register_results t, :cancel end @mutex.unlock if locked end |
#kill_message ⇒ Object
245 246 247 248 249 |
# File 'lib/base_chip/tasker.rb', line 245 def return if @kill_message @kill_message = true puts "#{Tasker::CLEAR}\rClosing down all workers. Please be patient." end |
#maintain_workers ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/base_chip/tasker.rb', line 141 def maintain_workers ready = @pending_tasks.select {|t| t.wait_count == 0} if @clusters slots = available_slots foo = ready.size - workers_spawned foo = slots if slots < foo if foo > 0 foo.times do |i| client_id = new_client_id cluster, uid = spawn_worker(ready[0].worker_command.call(client_id)) raise "spawn_worker response wasn't unique" if @workers[uid] @workers[client_id] = {:uid => uid, :cluster=> cluster, :started=>Time.now, :tasks=>[], :state=>:spawned, :worker_command=>ready[i].worker_command.call(0)} status_line end end else @workers[0] = {:started=>Time.now, :tasks=>[], :state=>:running, :worker_command=>ready[0].worker_command} if ready[0] end # puts "ready=#{(ready.*.task_name).inspect}" ready end |
#new_client_id ⇒ Object
138 139 140 |
# File 'lib/base_chip/tasker.rb', line 138 def new_client_id @@last_client_id += 1 end |
#ready ⇒ Object
58 59 60 |
# File 'lib/base_chip/tasker.rb', line 58 def ready init_test_sockets(:server) end |
#register_results(t, result, *additional) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/base_chip/tasker.rb', line 80 def register_results(t, result, *additional) if @client_of @server.register_results(t, result, *additional) return end @mutex.lock inner_register_results(t, result, *additional) @mutex.unlock end |
#run ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/base_chip/tasker.rb', line 62 def run STDOUT.sync = true if @client_of || @clusters.nil? begin if @client_of init_test_sockets(:client) end while(t = @server.get_task(@client_id)) @task_master.tasker_run_task t end rescue DRb::DRbConnError # raise if @options.debug end else DRb.thread.join end end |
#spawn_worker(command) ⇒ Object
129 130 131 132 133 134 135 136 |
# File 'lib/base_chip/tasker.rb', line 129 def spawn_worker(command) @clusters.each do |c| if c.available_slots > 0 c.slots_used += 1 return c, c.submit(command) end end end |
#status_line ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/base_chip/tasker.rb', line 114 def status_line return if @foreground wr = workers_running wa = workers_alive wp = wa > 0 ? (100*wr/wa).round : 0 raise "Workload is empty" if tasks_total == 0 print "#{Tasker::CLEAR}complete:#{@tasks_complete}/#{tasks_total}(#{(100*@tasks_complete/tasks_total).round}%) workers:#{wr}/#{wa}(#{wp}%) pass:#{@tasks_passed} fail:#{@tasks_failed} run:#{@tasks_running} pend:#{@tasks_pending}\r" end |
#submit(task) ⇒ Object
170 171 172 173 174 175 176 |
# File 'lib/base_chip/tasker.rb', line 170 def submit(task) task.foreground = self.foreground @pending_tasks << task @tasks_pending += 1 maintain_workers status_line end |
#submit_name(t_name, worker_command = nil) ⇒ Object
164 165 166 167 168 169 |
# File 'lib/base_chip/tasker.rb', line 164 def submit_name(t_name,worker_command=nil) task = Task.new task.task_name = t_name task.worker_command = worker_command if @clusters submit(task) end |
#submit_workload_chain(chain) ⇒ Object
177 178 179 180 181 182 |
# File 'lib/base_chip/tasker.rb', line 177 def submit_workload_chain(chain) @pending_tasks = chain @tasks_pending = chain.size maintain_workers status_line end |
#tasks_total ⇒ Object
54 55 56 |
# File 'lib/base_chip/tasker.rb', line 54 def tasks_total @tasks_pending + @tasks_running + @tasks_complete end |
#workers_alive ⇒ Object
251 |
# File 'lib/base_chip/tasker.rb', line 251 def workers_alive; c=0; @workers.each_value{|w| c+=1 if w[:state] != :stopped}; c end |
#workers_running ⇒ Object
252 |
# File 'lib/base_chip/tasker.rb', line 252 def workers_running; c=0; @workers.each_value{|w| c+=1 if w[:state] == :running}; c end |
#workers_spawned ⇒ Object
253 |
# File 'lib/base_chip/tasker.rb', line 253 def workers_spawned; c=0; @workers.each_value{|w| c+=1 if w[:state] == :spawned}; c end |
#workers_stopped ⇒ Object
254 |
# File 'lib/base_chip/tasker.rb', line 254 def workers_stopped; c=0; @workers.each_value{|w| c+=1 if w[:state] == :stopped}; c end |