Class: Fairy::Controller::NjobMapper

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/controller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cont, target_bjob, input_filter) ⇒ NjobMapper

Returns a new instance of NjobMapper.



812
813
814
815
816
817
818
819
820
821
822
823
# File 'lib/fairy/controller.rb', line 812

def initialize(cont, target_bjob, input_filter)
	@controller = cont
	@target_bjob = target_bjob

	@pre_bjob = @target_bjob.input
	@input_filter = input_filter

	init_policy

	Log::debug(self, "Mapping Policy: #{@pre_bjob.class} -(#{@policy.class})-> #{@target_bjob.class}")
	
end

Instance Attribute Details

#controllerObject (readonly)

Returns the value of attribute controller.



825
826
827
# File 'lib/fairy/controller.rb', line 825

def controller
  @controller
end

#input_filterObject (readonly)

Returns the value of attribute input_filter.



828
829
830
# File 'lib/fairy/controller.rb', line 828

def input_filter
  @input_filter
end

#policyObject (readonly)

Returns the value of attribute policy.



829
830
831
# File 'lib/fairy/controller.rb', line 829

def policy
  @policy
end

#pre_bjobObject (readonly)

Returns the value of attribute pre_bjob.



826
827
828
# File 'lib/fairy/controller.rb', line 826

def pre_bjob
  @pre_bjob
end

#target_bjobObject (readonly)

Returns the value of attribute target_bjob.



827
828
829
# File 'lib/fairy/controller.rb', line 827

def target_bjob
  @target_bjob
end

Instance Method Details

#assign_ntask(&block) ⇒ Object



870
871
872
# File 'lib/fairy/controller.rb', line 870

def assign_ntask(&block)
	@policy.assign_ntask(&block)
end

#bind_input(njob) ⇒ Object



874
875
876
# File 'lib/fairy/controller.rb', line 874

def bind_input(njob)
	@policy.bind_input(njob)
end

#init_policyObject



831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
# File 'lib/fairy/controller.rb', line 831

def init_policy
	if @pre_bjob.respond_to?(:postmapping_policy) && 
	    @pre_bjob.postmapping_policy
	  @policy = eval(@pre_bjob.postmapping_policy.to_s).new(self)
	  return
	end

# 今のところは必要なし(lazy create njob時に対応)
#  	if @target_bjob.kind_of?(BShuffle)
#  	  @policy = MPNewProcessorN.new(self)
#  	  return
#  	end

	case @pre_bjob
	when CFilePlace
	  #BInput系
	  @policy = MPInputProcessor.new(self)
	when CLocalIOPlace
	  @policy = MPLocalInputNewProcessorN.new(self)
	when CIotaPlace, CTherePlace
	  @policy = MPInputNewProcessorN.new(self)
	when CVarrayPlace
	  @policy = MPVarrayInputProcessor.new(self)
#	when CIotaPlace
#	  @policy = MPIotaInputProcessor.new(self)
	when CBasicGroupBy, CDirectProduct::CPreFilter, CWC #, CSegShuffle 
	  @policy = MPNewProcessorN.new(self)
#	  @policy = MPNewProcessor.new(self)
	when CSegSplit, CInject::CLocalInject, CFind::CLocalFind
	  @policy = MPNewProcessor.new(self)
#	when CSegShuffle
#	  @policy = MPPostShuffle.new(self)
#	when CZip::CPreZipFilter
#	  @policy = MPZippedFilter.new(self)
	else
	  @policy = MPSameNTask.new(self)
	end
end