Method: Fairy::Controller#assign_input_processor_n

Defined in:
lib/fairy/controller.rb

#assign_input_processor_n(bjob, host, &block) ⇒ Object



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/fairy/controller.rb', line 444

def assign_input_processor_n(bjob, host, &block)
  max_no = CONF.CONTROLLER_INPUT_PROCESSOR_N
  max_ntasks = CONF.CONTROLLER_MAX_ACTIVE_TASKS_IN_PROCESSOR

  loop do
  node = @master.node_in_reisured(host)
  unless node
    begin
 ERR::Raise ERR::NodeNotArrived, host 
    rescue
 handle_exception($!)
 raise AbortCreateNode
    end
  end

  no_of_processors = 0
  leisured_processor = nil
  min = nil
  for processor in @bjob2processors[bjob].dup
    next if processor.node != node
    no_of_processors += 1
    
    n = no_active_ntasks_in_processor(processor)
    if !min or min > n
 min = n
 leisured_processor = processor
    end
  end

  if max_no.nil? || max_no >= no_of_processors
    create_processor(node, bjob, &block)
    return
  elsif min > max_ntasks
    @no_active_ntasks_mutex.synchronize do
 Log::debug(self, "NO_ACTIVE_NTASKS: WAIT")
 @no_active_ntasks_cv.wait(@no_active_ntasks_mutex)
 Log::debug(self, "NO_ACTIVE_NTASKS: WAIT END")
    end
  else
    ret = reserve_processor(leisured_processor) {|processor|
 register_processor(bjob, processor)
 yield processor
    }
    unless ret
 # プロセッサが終了していたとき. もうちょっとどうにかしたい気もする
 assign_new_processor(bjob, &block)
    end
    return
  end
  end
end