Class: ScbiMapreduce::WorkManager
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- ScbiMapreduce::WorkManager
- Includes:
- EM::P::ObjectProtocol
- Defined in:
- lib/scbi_mapreduce/work_manager.rb
Overview
require ‘json’
Class Method Summary collapse
- .checkpoint ⇒ Object
- .controlled_exit ⇒ Object
- .end_work_manager ⇒ Object
- .get_checkpoint ⇒ Object
- .global_error_received(error_exception) ⇒ Object
- .init_work_manager ⇒ Object
- .init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs, exit_on_many_errors, chunk_size) ⇒ Object
- .save_stats(stats = nil, filename = 'scbi_mapreduce_stats.json') ⇒ Object
- .stats ⇒ Object
- .work_manager_finished ⇒ Object
Instance Method Summary collapse
- #checkpointable_job_received(obj) ⇒ Object
- #each_transmission_time(worker, time) ⇒ Object
- #each_worker_time(worker, time) ⇒ Object
- #error_received(worker_error, obj) ⇒ Object
-
#goto_checkpoint ⇒ Object
loads a checkpoint.
-
#initialize(*args) ⇒ WorkManager
constructor
A new instance of WorkManager.
-
#load_user_checkpoint(checkpoint) ⇒ Object
if this function returns -1, then automatic checkpointing is done.
- #mean_time(h) ⇒ Object
- #next_work ⇒ Object
- #post_init ⇒ Object
- #print_running_jobs ⇒ Object
- #read_until_checkpoint(checkpoint) ⇒ Object
- #receive_object(obj) ⇒ Object
- #remove_checkpoint ⇒ Object
- #save_checkpoint ⇒ Object
- #save_user_checkpoint ⇒ Object
- #send_initial_config ⇒ Object
-
#send_next_work ⇒ Object
send next work to worker.
- #send_stuck_work ⇒ Object
- #stop_work_manager ⇒ Object
- #too_many_errors_received ⇒ Object
- #trash_checkpointed_work ⇒ Object
-
#unbind ⇒ Object
A worker has disconected.
- #work_received(obj) ⇒ Object
- #worker_initial_config ⇒ Object
Constructor Details
#initialize(*args) ⇒ WorkManager
Returns a new instance of WorkManager.
638 639 640 641 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 638 def initialize(*args) super #puts "WORK MANAGER INITIALIZE NEWWWWWWWWWW, ONE per worker" end |
Class Method Details
.checkpoint ⇒ Object
292 293 294 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 292 def self.checkpoint return @@checkpoint end |
.controlled_exit ⇒ Object
504 505 506 507 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 504 def self.controlled_exit $SERVER_LOG.info("Controlled exit. Workers will be noticed in next round") @@want_to_exit=true end |
.end_work_manager ⇒ Object
147 148 149 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 147 def self.end_work_manager end |
.get_checkpoint ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 322 def self.get_checkpoint res = 0 begin if File.exists?(CHECKPOINT_FILE) res=File.read(CHECKPOINT_FILE).chomp # puts "read checkpoint #{res}" res = res.to_i end rescue res = 0 end return res end |
.global_error_received(error_exception) ⇒ Object
155 156 157 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 155 def self.global_error_received(error_exception) end |
.init_work_manager ⇒ Object
143 144 145 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 143 def self.init_work_manager end |
.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs, exit_on_many_errors, chunk_size) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 214 def self.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs,exit_on_many_errors,chunk_size) @@stats={} @@count = 0 @@retried_jobs=0 @@sent_chunks=0 @@received_objects=0 @@want_to_exit=false @@chunk_count = 0 @@workers = 0 @@max_workers = 0 @@error_count = 0 @@running_jobs=[] # @@compress=true @@checkpointing=checkpointing @@keep_order=keep_order @@retry_stuck_jobs=retry_stuck_jobs @@exit_on_many_errors=exit_on_many_errors # TODO - Implement a dynamic chunk_size @@chunk_size=chunk_size $SERVER_LOG.info "Processing in chunks of #{@@chunk_size} objects" $SERVER_LOG.info "Checkpointing: #{@@checkpointing}" $SERVER_LOG.info "Keeping output order: #{@@keep_order}" $SERVER_LOG.info "Retrying stuck jobs: #{@@retry_stuck_jobs}" $SERVER_LOG.info "Exiting on too many errors: #{@@exit_on_many_errors}" @@checkpoint=0 if @@checkpointing @@checkpoint=self.get_checkpoint $SERVER_LOG.info "Detected checkpoint at #{@@checkpoint}" end # for statistics: @@total_seconds=0 @@total_manager_time=0 # mean_worker_time=0 @@each_worker_time={} @@each_transmission_time={} @@total_read_time=0 @@total_write_time=0 # mean_transmission_time=0 end |
.save_stats(stats = nil, filename = 'scbi_mapreduce_stats.json') ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 202 def self.save_stats(stats=nil, filename='scbi_mapreduce_stats.json') f=File.open(filename,'w') if stats.nil? f.puts JSON::pretty_generate @@stats else f.puts JSON::pretty_generate stats end f.close end |
.stats ⇒ Object
198 199 200 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 198 def self.stats @@stats end |
.work_manager_finished ⇒ Object
151 152 153 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 151 def self.work_manager_finished end |
Instance Method Details
#checkpointable_job_received(obj) ⇒ Object
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 567 def checkpointable_job_received(obj) # find reveived object between sent jobs received_job=@@running_jobs.find{|o| o.job_identifier==obj.job_identifier} # save job if there is was a valid work previously sent if received_job # change this job's status to received, already done in previous method received_job.update_with_received!(obj) # # if there are sufficient jobs, count pending ones # if (@@running_jobs.count>=PENDING_TO_SAVE) # count received objects pending to be written, only until one that is still running is found pending_to_save=0 @@running_jobs.each do |job| if job.status==:received pending_to_save += 1 else break end end # if there are a few pending to save works, or all remaining works are pending, then save if (pending_to_save>=PENDING_TO_SAVE) || (pending_to_save==@@running_jobs.count) # save pending jobs and write to disk to_remove = 0 if @@checkpointing remove_checkpoint end @@running_jobs.each do |job| if job.status==:received # puts "Sent to save: #{job.inspect}" t=Time.now_us work_received(job.data) @@received_objects+=job.data.count @@total_write_time+=(Time.now_us - t) job.status=:saved to_remove += 1 else break end end # if some objects were saved, remove them from the running_jobs if to_remove > 0 to_remove.times do |i| o=@@running_jobs.shift # puts "Job removed #{o.inspect}" o=nil end # print_running_jobs if @@checkpointing && !@@want_to_exit save_checkpoint end end end # end else $SERVER_LOG.warn "Job already processed #{obj.inspect}" end end |
#each_transmission_time(worker, time) ⇒ Object
284 285 286 287 288 289 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 284 def each_transmission_time(worker,time) if @@each_transmission_time[worker].nil? then @@each_transmission_time[worker]=0 end @@each_transmission_time[worker]+=time end |
#each_worker_time(worker, time) ⇒ Object
277 278 279 280 281 282 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 277 def each_worker_time(worker,time) if @@each_worker_time[worker].nil? then @@each_worker_time[worker]=0 end @@each_worker_time[worker]+=time end |
#error_received(worker_error, obj) ⇒ Object
171 172 173 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 171 def error_received(worker_error, obj) end |
#goto_checkpoint ⇒ Object
loads a checkpoint
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 448 def goto_checkpoint if @@checkpoint>0 $SERVER_LOG.info "Skipping until checkpoint #{@@checkpoint}" checkpoint=load_user_checkpoint(@@checkpoint) # do an automatic checkpoint restore if checkpoint==-1 (@@checkpoint - 1).times do |i| $SERVER_LOG.info "Automatic trashing Chunk #{i+1}" # get next work @@chunk_size.times do obj=next_work end # trash_checkpointed_work end $SERVER_LOG.info "Automatic checkpoint finished" WorkManagerData.job_id=@@checkpoint #user has done the checkpoint restoration elsif checkpoint>0 WorkManagerData.job_id=checkpoint elsif checkpoint==0 $SERVER_LOG.info "Automatic checkpoint not done" end @@checkpoint=0 end end |
#load_user_checkpoint(checkpoint) ⇒ Object
if this function returns -1, then automatic checkpointing is done. Return 0 to no checkpointing. Return the restored checkpoint number to start in this point.
186 187 188 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 186 def load_user_checkpoint(checkpoint) return -1 end |
#mean_time(h) ⇒ Object
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 262 def mean_time(h) r=0 i=0 h.each do |k,v| r+=h[k] i+=1 end if r>0 r=r/i.to_f end return r end |
#next_work ⇒ Object
159 160 161 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 159 def next_work end |
#post_init ⇒ Object
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 485 def post_init @@workers += 1 @@max_workers +=1 # when first worker is connected, do special config if @@workers == 1 @@total_seconds = Time.now_us $SERVER_LOG.info "First worker connected" if @@checkpointing $SERVER_LOG.info "Checking for checkpoint" goto_checkpoint end end $SERVER_LOG.info "#{@@workers} workers connected" send_initial_config send_next_work end |
#print_running_jobs ⇒ Object
350 351 352 353 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 350 def print_running_jobs jobs=@@running_jobs.map{|j| j.inspect}.join("\n") $SERVER_LOG.debug("Running Jobs:\n#{jobs}") end |
#read_until_checkpoint(checkpoint) ⇒ Object
179 180 181 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 179 def read_until_checkpoint(checkpoint) end |
#receive_object(obj) ⇒ Object
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 510 def receive_object(obj) # check if response is an error if obj.is_a?(Exception) $SERVER_LOG.error("Error in worker #{obj.worker_id} while processing object #{obj.object.inspect}\n" + obj.original_exception. + ":\n" + obj.original_exception.backtrace.join("\n")) @@error_count += 1 error_received(obj,obj.object.data) # if there are too many errors if (@@count>100) && (@@error_count >= @@count*0.8) # notice programmer res=too_many_errors_received # force exit if too_many_errors_received returns true if @@exit_on_many_errors || res $SERVER_LOG.error("Want to exit due to too many errors") self.controlled_exit end end elsif obj == :waking_up $SERVER_LOG.info("Worker woke up") else # if not using checkpointing obj.received!(obj.data) if @@checkpointing || @@keep_order || @@retry_stuck_jobs # print_running_jobs checkpointable_job_received(obj) else # change this job's status to received t=Time.now_us work_received(obj.data) @@received_objects+=obj.data.count @@total_write_time+=(Time.now_us - t) end # puts obj.worker_identifier,obj.worker_identifier.class # if obj.worker_identifier==0 then # end each_worker_time(obj.worker_identifier, obj.worker_time) each_transmission_time(obj.worker_identifier, obj.transmission_time) end # free mem obj=nil send_next_work end |
#remove_checkpoint ⇒ Object
296 297 298 299 300 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 296 def remove_checkpoint if File.exists?(CHECKPOINT_FILE) checkpoint_file = FileUtils.mv(CHECKPOINT_FILE,OLD_CHECKPOINT_FILE) end end |
#save_checkpoint ⇒ Object
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 303 def save_checkpoint checkpoint_file = File.open(CHECKPOINT_FILE,'w') if !@@running_jobs.empty? checkpoint_value = @@running_jobs.first.job_identifier else checkpoint_value = WorkManagerData.job_id end $SERVER_LOG.info "Saving checkpoint: #{checkpoint_value}" checkpoint_file.puts checkpoint_value checkpoint_file.close save_user_checkpoint end |
#save_user_checkpoint ⇒ Object
190 191 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 190 def save_user_checkpoint end |
#send_initial_config ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 338 def send_initial_config config = worker_initial_config if config.nil? obj = :no_initial_config else obj = {:initial_config => config} end send_object(obj) end |
#send_next_work ⇒ Object
send next work to worker
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 382 def send_next_work # if we need to exit, send quit to workers if @@want_to_exit send_object(:quit) elsif !send_stuck_work #send stuck work objs=[] t=Time.now_us begin # prepare new data @@chunk_size.times do obj=next_work if obj.nil? break else # add to obj array objs << obj end end rescue Exception => e $SERVER_LOG.error("Exception creating next_work. Worker, quit!") send_object(:sleep) self.class.global_error_received(e) #raise e end @@total_read_time+=(Time.now_us - t) # if new was data collected, send it if objs.count>0 @@count += objs.count @@chunk_count += 1 work_data=WorkManagerData.new(objs) send_object(work_data) @@sent_chunks+=1 # to keep order or retry failed job, we need job status if @@keep_order || @@retry_stuck_jobs # do not remove data to be able to sent it again # work_data.data=nil @@running_jobs.push work_data # print_running_jobs end else # otherwise, if @@running_jobs.count >0 $SERVER_LOG.info("Worker, go to sleep") send_object(:sleep) else # send a quit value indicating no more data available send_object(:quit) end end end end |
#send_stuck_work ⇒ Object
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 355 def send_stuck_work sent=false if @@retry_stuck_jobs # $SERVER_LOG.debug("="*40) # print_running_jobs # count stuck jobs and re-sent the first one stuck_works=@@running_jobs.select{|job| job.stuck?} if !stuck_works.empty? jobs=stuck_works.map{|j| j.inspect}.join("\n") $SERVER_LOG.info("Stuck Jobs:\n#{jobs}") # send_object stuck_works.first.sent! send_object(stuck_works.first) @@sent_chunks+=1 @@retried_jobs+=1 $SERVER_LOG.info("Sending stuck work #{stuck_works.first.inspect}") sent=true end end return sent end |
#stop_work_manager ⇒ Object
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 660 def stop_work_manager EM.stop $SERVER_LOG.info "Exiting server" self.class.end_work_manager @@total_seconds = (Time.now_us-@@total_seconds) @@total_manager_time= @@total_manager_time @@total_read_time=@@total_read_time @@total_write_time=@@total_write_time mean_worker_time=mean_time(@@each_worker_time) mean_transmission_time=mean_time(@@each_transmission_time) idle_time=(@@total_seconds - @@total_read_time -@@total_write_time - mean_transmission_time) @@stats={} @@stats[:total_objects]=@@count @@stats[:total_seconds]=@@total_seconds @@stats[:sent_chunks]=@@sent_chunks @@stats[:received_objects]=@@received_objects @@stats[:processing_rate]=(@@count/@@total_seconds.to_f) @@stats[:total_read_time]=@@total_read_time @@stats[:total_write_time]=@@total_write_time @@stats[:mean_worker_time]=mean_worker_time @@stats[:mean_transmission_time]=mean_transmission_time @@stats[:total_manager_idle_time]=idle_time @@stats[:error_count]=@@error_count @@stats[:retried_jobs]=@@retried_jobs @@stats[:chunk_size]=@@chunk_size @@stats[:connected_workers]=@@max_workers @@stats[:each_transmission_time]=@@each_transmission_time @@stats[:each_worker_time]=@@each_worker_time $SERVER_LOG.info "Total processed: #{@@count} objects in #{@@total_seconds} seconds" $SERVER_LOG.info "Total sent chunks: #{@@sent_chunks} objects" $SERVER_LOG.info "Total sent objects: #{@@count} objects" $SERVER_LOG.info "Total received objects: #{@@received_objects} objects" $SERVER_LOG.info "Processing rate: #{"%.2f" % (@@count/@@total_seconds.to_f)} objects per second" $SERVER_LOG.info "Connection rate: #{"%.2f" % (@@chunk_count/@@total_seconds.to_f)} connections per second" $SERVER_LOG.info "Total read time #{@@total_read_time} seconds" $SERVER_LOG.info "Total write time #{@@total_write_time} seconds" # mean_worker_time=mean_worker_time/@@max_workers $SERVER_LOG.info "Total worker time #{mean_worker_time} seconds" $SERVER_LOG.info "Total transmission time #{mean_transmission_time} seconds" $SERVER_LOG.info "Total manager_idle time #{idle_time} seconds" # $SERVER_LOG.info "Total manager time #{@@total_read_time + @@total_write_time + mean_transmission_time} seconds" $SERVER_LOG.info "Number of errors: #{@@error_count}" $SERVER_LOG.info "Number of retried stuck jobs: #{@@retried_jobs}" $SERVER_LOG.info "Chunk size: #{@@chunk_size}" $SERVER_LOG.info "Total connected workers: #{@@max_workers}" self.class.work_manager_finished end |
#too_many_errors_received ⇒ Object
175 176 177 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 175 def too_many_errors_received end |
#trash_checkpointed_work ⇒ Object
193 194 195 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 193 def trash_checkpointed_work end |
#unbind ⇒ Object
A worker has disconected
644 645 646 647 648 649 650 651 652 653 654 655 656 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 644 def unbind @@workers -= 1 #puts @@running_jobs.to_json $SERVER_LOG.info "Worker disconnected. #{@@workers} kept running" # no more workers left, shutdown EM and stop server if @@workers == 0 $SERVER_LOG.info "All workers finished" stop_work_manager end end |
#work_received(obj) ⇒ Object
163 164 165 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 163 def work_received(obj) end |
#worker_initial_config ⇒ Object
167 168 169 |
# File 'lib/scbi_mapreduce/work_manager.rb', line 167 def worker_initial_config end |