Class: Fairy::Controller::NjobMapper

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/controller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cont, target_bjob, input_filter) ⇒ NjobMapper

Returns a new instance of NjobMapper.



791
792
793
794
795
796
797
798
799
800
801
802
# File 'lib/fairy/controller.rb', line 791

def initialize(cont, target_bjob, input_filter)
	@controller = cont
	@target_bjob = target_bjob

	@pre_bjob = @target_bjob.input
	@input_filter = input_filter

	init_policy

	Log::debug(self, "Mapping Policy: #{@pre_bjob.class} -(#{@policy.class})-> #{@target_bjob.class}")
	
end

Instance Attribute Details

#controllerObject (readonly)

Returns the value of attribute controller.



804
805
806
# File 'lib/fairy/controller.rb', line 804

def controller
  @controller
end

#input_filterObject (readonly)

Returns the value of attribute input_filter.



807
808
809
# File 'lib/fairy/controller.rb', line 807

def input_filter
  @input_filter
end

#policyObject (readonly)

Returns the value of attribute policy.



808
809
810
# File 'lib/fairy/controller.rb', line 808

def policy
  @policy
end

#pre_bjobObject (readonly)

Returns the value of attribute pre_bjob.



805
806
807
# File 'lib/fairy/controller.rb', line 805

def pre_bjob
  @pre_bjob
end

#target_bjobObject (readonly)

Returns the value of attribute target_bjob.



806
807
808
# File 'lib/fairy/controller.rb', line 806

def target_bjob
  @target_bjob
end

Instance Method Details

#assign_ntask(&block) ⇒ Object



849
850
851
# File 'lib/fairy/controller.rb', line 849

def assign_ntask(&block)
	@policy.assign_ntask(&block)
end

#bind_input(njob) ⇒ Object



853
854
855
# File 'lib/fairy/controller.rb', line 853

def bind_input(njob)
	@policy.bind_input(njob)
end

#init_policyObject



810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
# File 'lib/fairy/controller.rb', line 810

def init_policy
	if @pre_bjob.respond_to?(:postmapping_policy) && 
	    @pre_bjob.postmapping_policy
	  @policy = eval(@pre_bjob.postmapping_policy.to_s).new(self)
	  return
	end

# 今のところは必要なし(lazy create njob時に対応)
#  	if @target_bjob.kind_of?(BShuffle)
#  	  @policy = MPNewProcessorN.new(self)
#  	  return
#  	end

	case @pre_bjob
	when CFilePlace
	  #BInput系
	  @policy = MPInputProcessor.new(self)
	when CLocalIOPlace
	  @policy = MPLocalInputNewProcessorN.new(self)
	when CIotaPlace, CTherePlace
	  @policy = MPInputNewProcessorN.new(self)
	when CVarrayPlace
	  @policy = MPVarrayInputProcessor.new(self)
#	when CIotaPlace
#	  @policy = MPIotaInputProcessor.new(self)
	when CBasicGroupBy, CDirectProduct::CPreFilter, CWC #, CSegShuffle 
	  @policy = MPNewProcessorN.new(self)
#	  @policy = MPNewProcessor.new(self)
	when CSegSplit, CInject::CLocalInject, CFind::CLocalFind
	  @policy = MPNewProcessor.new(self)
#	when CSegShuffle
#	  @policy = MPPostShuffle.new(self)
#	when CZip::CPreZipFilter
#	  @policy = MPZippedFilter.new(self)
	else
	  @policy = MPSameNTask.new(self)
	end
end