Class: Fairy::CSort::CPreSort

Inherits:
Fairy::CBasicGroupBy show all
Defined in:
lib/fairy/master/c-sort.rb

Instance Attribute Summary

Attributes inherited from Fairy::CIOFilter

#input

Attributes included from Fairy::CInputtable

#input

Instance Method Summary collapse

Methods inherited from Fairy::CBasicGroupBy

#add_exports, #all_node_arrived?, #all_node_imported?, #bind_export, #each_assigned_filter, #each_export_by, #start_create_nodes, #start_watch_all_node_imported, #start_watch_all_node_imported_ORG, #update_exports

Methods inherited from Fairy::CIOFilter

#node_class, #output=

Methods included from Fairy::CInputtable

#break_running, #inputtable?

Methods inherited from Fairy::CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_create_nodes, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CPreSort

Returns a new instance of CPreSort.



16
17
18
19
20
21
22
23
24
# File 'lib/fairy/master/c-sort.rb', line 16

def initialize(controller, opts, block_source)
	super

	@samplings = []

	@pvs = nil
	@pvs_mutex = Mutex.new
	@pvs_cv = ConditionVariable.new
end

Instance Method Details

#get_pvs(buf = nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fairy/master/c-sort.rb', line 26

def get_pvs(buf=nil)
# BUG#271対応. 全てのセグメントからサンプルを取るのではなく, 最初のセ
# グメントからのみサンプリングを取るようにした.
	if buf
	  @samplings.push buf

# BUG#271対応. 
#	if @samplings.size >= number_of_nodes
#	    make_pvs
#	end
	  make_pvs
	end

	@pvs_mutex.synchronize do
	  while !@pvs
	    @pvs_cv.wait(@pvs_mutex)
	  end
	end
	@pvs
end

#make_pvsObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fairy/master/c-sort.rb', line 48

def make_pvs
	no_segment = @opts[:no_segment]
	no_segment ||= Fairy::CONF.SORT_NO_SEGMENT


	cmp_opt = @opts[:cmp_optimize]
	cmp_opt = CONF.SORT_CMP_OPTIMIZE if cmp_opt.nil?
	
	if cmp_opt
	  key_proc = eval("proc{#{@block_source.source}}", @context.binding)
	else
	  key_proc = BBlock.new(@block_source, @context, self)
	end

	sorted = @samplings.flatten(1).sort_by{|e| key_proc.call(e)}

#Log::debugf(self, "%s", sorted.inspect)
	idxes = (1...no_segment).collect{|i| (sorted.size*i).div(no_segment)}
	@pvs_mutex.synchronize do
	  @pvs = sorted.values_at(*idxes)
	  @pvs_cv.broadcast
	end
end

#njob_creation_paramsObject



76
77
78
# File 'lib/fairy/master/c-sort.rb', line 76

def njob_creation_params
	[@block_source]
end

#node_class_nameObject



72
73
74
# File 'lib/fairy/master/c-sort.rb', line 72

def node_class_name
	"PSort::PPreSort"
end