Class: ScbiMapreduce::WorkManager

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
EM::P::ObjectProtocol
Defined in:
lib/scbi_mapreduce/work_manager.rb

Overview

require ‘json’

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ WorkManager

Returns a new instance of WorkManager.



638
639
640
641
# File 'lib/scbi_mapreduce/work_manager.rb', line 638

def initialize(*args)
  super
  #puts "WORK MANAGER INITIALIZE NEWWWWWWWWWW, ONE per worker"
end

Class Method Details

.checkpointObject



292
293
294
# File 'lib/scbi_mapreduce/work_manager.rb', line 292

def self.checkpoint
  return @@checkpoint
end

.controlled_exitObject



504
505
506
507
# File 'lib/scbi_mapreduce/work_manager.rb', line 504

def self.controlled_exit
  $SERVER_LOG.info("Controlled exit. Workers will be noticed in next round")
  @@want_to_exit=true
end

.end_work_managerObject



147
148
149
# File 'lib/scbi_mapreduce/work_manager.rb', line 147

def self.end_work_manager

end

.get_checkpointObject



322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/scbi_mapreduce/work_manager.rb', line 322

def self.get_checkpoint
  res = 0
  begin
    if File.exists?(CHECKPOINT_FILE)
      res=File.read(CHECKPOINT_FILE).chomp
      # puts "read checkpoint #{res}"

      res = res.to_i
    end
  rescue
    res = 0
  end

  return res
end

.global_error_received(error_exception) ⇒ Object



155
156
157
# File 'lib/scbi_mapreduce/work_manager.rb', line 155

def self.global_error_received(error_exception)

end

.init_work_managerObject



143
144
145
# File 'lib/scbi_mapreduce/work_manager.rb', line 143

def self.init_work_manager

end

.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs, exit_on_many_errors, chunk_size) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/scbi_mapreduce/work_manager.rb', line 214

def self.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs,exit_on_many_errors,chunk_size)
  @@stats={}
  @@count = 0
  @@retried_jobs=0
  @@sent_chunks=0
  @@received_objects=0
  @@want_to_exit=false
  @@chunk_count = 0
  @@workers = 0
  @@max_workers = 0
  @@error_count = 0
  @@running_jobs=[]
  # @@compress=true

  @@checkpointing=checkpointing
  @@keep_order=keep_order
  @@retry_stuck_jobs=retry_stuck_jobs
  @@exit_on_many_errors=exit_on_many_errors

  # TODO - Implement a dynamic chunk_size

  @@chunk_size=chunk_size
  $SERVER_LOG.info "Processing in chunks of #{@@chunk_size} objects"
  $SERVER_LOG.info "Checkpointing: #{@@checkpointing}"
  $SERVER_LOG.info "Keeping output order: #{@@keep_order}"
  $SERVER_LOG.info "Retrying stuck jobs: #{@@retry_stuck_jobs}"
  $SERVER_LOG.info "Exiting on too many errors: #{@@exit_on_many_errors}"

  @@checkpoint=0
  if @@checkpointing
    @@checkpoint=self.get_checkpoint
    $SERVER_LOG.info "Detected checkpoint at #{@@checkpoint}"
  end
  
  # for statistics:
  @@total_seconds=0
  @@total_manager_time=0
  # mean_worker_time=0
  @@each_worker_time={}
  @@each_transmission_time={}
  
  @@total_read_time=0
  @@total_write_time=0
  # mean_transmission_time=0

end

.save_stats(stats = nil, filename = 'scbi_mapreduce_stats.json') ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
# File 'lib/scbi_mapreduce/work_manager.rb', line 202

def self.save_stats(stats=nil, filename='scbi_mapreduce_stats.json')
  f=File.open(filename,'w')
  
  if stats.nil?
    f.puts JSON::pretty_generate @@stats
  else
    f.puts JSON::pretty_generate stats
  end
  
  f.close
end

.statsObject



198
199
200
# File 'lib/scbi_mapreduce/work_manager.rb', line 198

def self.stats
  @@stats
end

.work_manager_finishedObject



151
152
153
# File 'lib/scbi_mapreduce/work_manager.rb', line 151

def self.work_manager_finished

end

Instance Method Details

#checkpointable_job_received(obj) ⇒ Object



567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
# File 'lib/scbi_mapreduce/work_manager.rb', line 567

def checkpointable_job_received(obj)

  # find reveived object between sent jobs
  received_job=@@running_jobs.find{|o| o.job_identifier==obj.job_identifier}

  # save job if there is was a valid work previously sent
  if received_job

    # change this job's status to received, already done in previous method
    received_job.update_with_received!(obj)

    # # if there are sufficient jobs, count pending ones
    # if (@@running_jobs.count>=PENDING_TO_SAVE)

    # count received objects pending to be written, only until one that is still running is found
    pending_to_save=0
    @@running_jobs.each do |job|
      if job.status==:received
        pending_to_save += 1
      else
        break
      end
    end

    # if there are a few pending to save works, or all remaining works are pending, then save
    if (pending_to_save>=PENDING_TO_SAVE) || (pending_to_save==@@running_jobs.count)
      # save pending jobs and write to disk
      to_remove = 0
      
      if @@checkpointing
        remove_checkpoint
      end
      
      @@running_jobs.each do |job|
        if job.status==:received
          # puts "Sent to save: #{job.inspect}"
          t=Time.now_us
          work_received(job.data)
          @@received_objects+=job.data.count
          @@total_write_time+=(Time.now_us - t)

          job.status=:saved
          to_remove += 1
        else
          break
        end
      end

      # if some objects were saved, remove them from the running_jobs
      if to_remove > 0
        to_remove.times do |i|
          o=@@running_jobs.shift

          # puts "Job removed #{o.inspect}"
          o=nil
        end

        # print_running_jobs

        if @@checkpointing && !@@want_to_exit

          save_checkpoint
        end
      end
    end
    # end
  else
    $SERVER_LOG.warn "Job already processed #{obj.inspect}"
  end
end

#each_transmission_time(worker, time) ⇒ Object



284
285
286
287
288
289
# File 'lib/scbi_mapreduce/work_manager.rb', line 284

def each_transmission_time(worker,time)
  if @@each_transmission_time[worker].nil? then
    @@each_transmission_time[worker]=0
  end
  @@each_transmission_time[worker]+=time
end

#each_worker_time(worker, time) ⇒ Object



277
278
279
280
281
282
# File 'lib/scbi_mapreduce/work_manager.rb', line 277

def each_worker_time(worker,time)
  if @@each_worker_time[worker].nil? then
    @@each_worker_time[worker]=0
  end
  @@each_worker_time[worker]+=time
end

#error_received(worker_error, obj) ⇒ Object



171
172
173
# File 'lib/scbi_mapreduce/work_manager.rb', line 171

def error_received(worker_error, obj)

end

#goto_checkpointObject

loads a checkpoint



448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# File 'lib/scbi_mapreduce/work_manager.rb', line 448

def goto_checkpoint
  if @@checkpoint>0
    $SERVER_LOG.info "Skipping until checkpoint #{@@checkpoint}"

    checkpoint=load_user_checkpoint(@@checkpoint)

    # do an automatic checkpoint restore
    if checkpoint==-1
      (@@checkpoint - 1).times do |i|
        $SERVER_LOG.info "Automatic trashing Chunk #{i+1}"
        # get next work
        @@chunk_size.times do
          obj=next_work
        end
        # trash_checkpointed_work
      end

      $SERVER_LOG.info "Automatic checkpoint finished"

      WorkManagerData.job_id=@@checkpoint

      #user has done the checkpoint restoration
    elsif checkpoint>0
      
      WorkManagerData.job_id=checkpoint
      
    elsif checkpoint==0
      $SERVER_LOG.info "Automatic checkpoint not done"
    end


    @@checkpoint=0

  end

end

#load_user_checkpoint(checkpoint) ⇒ Object

if this function returns -1, then automatic checkpointing is done. Return 0 to no checkpointing. Return the restored checkpoint number to start in this point.



186
187
188
# File 'lib/scbi_mapreduce/work_manager.rb', line 186

def load_user_checkpoint(checkpoint)
  return -1
end

#mean_time(h) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/scbi_mapreduce/work_manager.rb', line 262

def mean_time(h)
  r=0
  i=0
  
  h.each do |k,v|
    r+=h[k]
    i+=1
  end
  
  if r>0
    r=r/i.to_f
  end
  
  return r
end

#next_workObject



159
160
161
# File 'lib/scbi_mapreduce/work_manager.rb', line 159

def next_work

end

#post_initObject



485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/scbi_mapreduce/work_manager.rb', line 485

def post_init
  @@workers += 1
  @@max_workers +=1
  # when first worker is connected, do special config
  if @@workers == 1
    @@total_seconds = Time.now_us
    $SERVER_LOG.info "First worker connected"

    if @@checkpointing
      $SERVER_LOG.info "Checking for checkpoint"
      goto_checkpoint
    end
  end

  $SERVER_LOG.info "#{@@workers} workers connected"
  send_initial_config
  send_next_work
end


350
351
352
353
# File 'lib/scbi_mapreduce/work_manager.rb', line 350

def print_running_jobs
  jobs=@@running_jobs.map{|j| j.inspect}.join("\n")
  $SERVER_LOG.debug("Running Jobs:\n#{jobs}")
end

#read_until_checkpoint(checkpoint) ⇒ Object



179
180
181
# File 'lib/scbi_mapreduce/work_manager.rb', line 179

def read_until_checkpoint(checkpoint)

end

#receive_object(obj) ⇒ Object



510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# File 'lib/scbi_mapreduce/work_manager.rb', line 510

def receive_object(obj)

  # check if response is an error
  if obj.is_a?(Exception)
    $SERVER_LOG.error("Error in worker #{obj.worker_id} while processing object #{obj.object.inspect}\n" + obj.original_exception.message + ":\n" + obj.original_exception.backtrace.join("\n"))

    @@error_count += 1

    error_received(obj,obj.object.data)

    # if there are too many errors
    if (@@count>100) && (@@error_count >= @@count*0.8)

      # notice programmer
      res=too_many_errors_received

      # force exit if too_many_errors_received returns true
      if @@exit_on_many_errors || res
        $SERVER_LOG.error("Want to exit due to too many errors")
        self.controlled_exit
      end
    end

  elsif obj == :waking_up
    $SERVER_LOG.info("Worker woke up")
  else
    
    # if not using checkpointing
    obj.received!(obj.data)

    if @@checkpointing || @@keep_order || @@retry_stuck_jobs
      # print_running_jobs
      checkpointable_job_received(obj)
    else
      # change this job's status to received
      
      t=Time.now_us
      work_received(obj.data)
      @@received_objects+=obj.data.count
      @@total_write_time+=(Time.now_us - t)
    end
    
    # puts obj.worker_identifier,obj.worker_identifier.class
    # if obj.worker_identifier==0 then
    # end
    
    each_worker_time(obj.worker_identifier, obj.worker_time)
    each_transmission_time(obj.worker_identifier, obj.transmission_time)
  end

  # free mem
  obj=nil
  send_next_work

end

#remove_checkpointObject



296
297
298
299
300
# File 'lib/scbi_mapreduce/work_manager.rb', line 296

def remove_checkpoint
  if File.exists?(CHECKPOINT_FILE)
    checkpoint_file = FileUtils.mv(CHECKPOINT_FILE,OLD_CHECKPOINT_FILE)
  end
end

#save_checkpointObject



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/scbi_mapreduce/work_manager.rb', line 303

def save_checkpoint
  checkpoint_file = File.open(CHECKPOINT_FILE,'w')
  
  if !@@running_jobs.empty?
    checkpoint_value = @@running_jobs.first.job_identifier
  else
     checkpoint_value = WorkManagerData.job_id
  end
  
  $SERVER_LOG.info "Saving checkpoint: #{checkpoint_value}"
  
  checkpoint_file.puts checkpoint_value
  
  checkpoint_file.close
  
  save_user_checkpoint

end

#save_user_checkpointObject



190
191
# File 'lib/scbi_mapreduce/work_manager.rb', line 190

def save_user_checkpoint
end

#send_initial_configObject



338
339
340
341
342
343
344
345
346
347
348
# File 'lib/scbi_mapreduce/work_manager.rb', line 338

def send_initial_config
  config = worker_initial_config

  if config.nil?
    obj = :no_initial_config
  else
    obj = {:initial_config => config}
  end

  send_object(obj)
end

#send_next_workObject

send next work to worker



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/scbi_mapreduce/work_manager.rb', line 382

def send_next_work

  # if we need to exit, send quit to workers
  
  if @@want_to_exit
    send_object(:quit)
    
  elsif !send_stuck_work
    
  #send stuck work
    objs=[]
    
    t=Time.now_us
    
    begin
    # prepare new data
    @@chunk_size.times do
      obj=next_work
      if obj.nil?
        break
      else
        # add to obj array
        objs << obj
      end
    end
    rescue Exception => e
      $SERVER_LOG.error("Exception creating next_work. Worker, quit!")
      send_object(:sleep)
      self.class.global_error_received(e)

      #raise e
    end
    
    @@total_read_time+=(Time.now_us - t)
    
    # if new was data collected, send it
    if objs.count>0
      @@count += objs.count
      @@chunk_count += 1

      work_data=WorkManagerData.new(objs)
      send_object(work_data)
      @@sent_chunks+=1

      # to keep order or retry failed job, we need job status
      if @@keep_order || @@retry_stuck_jobs
        # do not remove data to be able to sent it again
        # work_data.data=nil
        @@running_jobs.push work_data
        # print_running_jobs
      end
    else
      # otherwise, 
      if @@running_jobs.count >0
        $SERVER_LOG.info("Worker, go to sleep")
        send_object(:sleep)
        
      else
        # send a quit value indicating no more data available
        send_object(:quit)
      end
    end
  end
end

#send_stuck_workObject



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/scbi_mapreduce/work_manager.rb', line 355

def send_stuck_work
  sent=false

  if @@retry_stuck_jobs
    # $SERVER_LOG.debug("="*40)
    # print_running_jobs
    # count stuck jobs and re-sent the first one
    stuck_works=@@running_jobs.select{|job| job.stuck?}

    if !stuck_works.empty?
      jobs=stuck_works.map{|j| j.inspect}.join("\n")
      $SERVER_LOG.info("Stuck Jobs:\n#{jobs}")

      # send_object
      stuck_works.first.sent!
      send_object(stuck_works.first)
      @@sent_chunks+=1
      @@retried_jobs+=1
      $SERVER_LOG.info("Sending stuck work #{stuck_works.first.inspect}")
      sent=true
    end
  end

  return sent
end

#stop_work_managerObject



660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
# File 'lib/scbi_mapreduce/work_manager.rb', line 660

def stop_work_manager
  
  
  
  EM.stop
  $SERVER_LOG.info  "Exiting server"


  self.class.end_work_manager


  @@total_seconds = (Time.now_us-@@total_seconds)
  @@total_manager_time= @@total_manager_time 
  
  @@total_read_time=@@total_read_time
  @@total_write_time=@@total_write_time

  mean_worker_time=mean_time(@@each_worker_time)
  mean_transmission_time=mean_time(@@each_transmission_time)
  
  idle_time=(@@total_seconds - @@total_read_time -@@total_write_time - mean_transmission_time)
  
  @@stats={}
  @@stats[:total_objects]=@@count
  @@stats[:total_seconds]=@@total_seconds
  @@stats[:sent_chunks]=@@sent_chunks
  @@stats[:received_objects]=@@received_objects
  @@stats[:processing_rate]=(@@count/@@total_seconds.to_f)
  @@stats[:total_read_time]=@@total_read_time
  @@stats[:total_write_time]=@@total_write_time
  @@stats[:mean_worker_time]=mean_worker_time
  @@stats[:mean_transmission_time]=mean_transmission_time
  @@stats[:total_manager_idle_time]=idle_time
  
  @@stats[:error_count]=@@error_count
  @@stats[:retried_jobs]=@@retried_jobs
  @@stats[:chunk_size]=@@chunk_size
  @@stats[:connected_workers]=@@max_workers
  @@stats[:each_transmission_time]=@@each_transmission_time
  @@stats[:each_worker_time]=@@each_worker_time


  
  
  $SERVER_LOG.info  "Total processed: #{@@count} objects in #{@@total_seconds} seconds"
  $SERVER_LOG.info  "Total sent chunks: #{@@sent_chunks} objects"
  
  $SERVER_LOG.info  "Total sent objects: #{@@count} objects"
  $SERVER_LOG.info  "Total received objects: #{@@received_objects} objects"
  
  $SERVER_LOG.info  "Processing rate: #{"%.2f" % (@@count/@@total_seconds.to_f)} objects per second"
  $SERVER_LOG.info  "Connection rate: #{"%.2f" % (@@chunk_count/@@total_seconds.to_f)} connections per second"
  
  $SERVER_LOG.info "Total read time #{@@total_read_time} seconds"
  $SERVER_LOG.info "Total write time #{@@total_write_time} seconds"
  # mean_worker_time=mean_worker_time/@@max_workers
  $SERVER_LOG.info "Total worker time #{mean_worker_time} seconds"
  $SERVER_LOG.info "Total transmission time #{mean_transmission_time} seconds"
  $SERVER_LOG.info "Total manager_idle time #{idle_time} seconds"
  # $SERVER_LOG.info "Total manager time #{@@total_read_time + @@total_write_time + mean_transmission_time} seconds"
  
  $SERVER_LOG.info  "Number of errors: #{@@error_count}"
  $SERVER_LOG.info  "Number of retried stuck jobs: #{@@retried_jobs}"
  $SERVER_LOG.info  "Chunk size: #{@@chunk_size}"
  $SERVER_LOG.info  "Total connected workers: #{@@max_workers}"
  
  self.class.work_manager_finished

end

#too_many_errors_receivedObject



175
176
177
# File 'lib/scbi_mapreduce/work_manager.rb', line 175

def too_many_errors_received

end

#trash_checkpointed_workObject



193
194
195
# File 'lib/scbi_mapreduce/work_manager.rb', line 193

def trash_checkpointed_work

end

#unbindObject

A worker has disconected



644
645
646
647
648
649
650
651
652
653
654
655
656
# File 'lib/scbi_mapreduce/work_manager.rb', line 644

def unbind

  @@workers -= 1
  #puts @@running_jobs.to_json

  $SERVER_LOG.info  "Worker disconnected. #{@@workers} kept running"

  # no more workers left, shutdown EM and stop server
  if @@workers == 0
    $SERVER_LOG.info  "All workers finished"
    stop_work_manager
  end
end

#work_received(obj) ⇒ Object



163
164
165
# File 'lib/scbi_mapreduce/work_manager.rb', line 163

def work_received(obj)

end

#worker_initial_configObject



167
168
169
# File 'lib/scbi_mapreduce/work_manager.rb', line 167

def worker_initial_config

end