Class: Fairy::CDirectProduct::CPreFilter

Inherits:
Fairy::CIOFilter show all
Defined in:
lib/fairy/master/c-direct-product.rb

Overview

呼ばれない

def start_create_nodes
  @main_prefilter.start_create_nodes
end

Instance Attribute Summary

Attributes inherited from Fairy::CIOFilter

#input

Attributes included from Fairy::CInputtable

#input

Instance Method Summary collapse

Methods inherited from Fairy::CIOFilter

#node_class, #output=

Methods included from Fairy::CInputtable

#break_running, #inputtable?

Methods inherited from Fairy::CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CPreFilter

Returns a new instance of CPreFilter.



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fairy/master/c-direct-product.rb', line 90

def initialize(controller, opts, block_source)
	super
	@block_source = block_source

	@no = 0
	@exports = {}
	@exports_mutex = Mutex.new
#	@exports_cv = ConditionVariable.new

	@products = nil
	@products_mutex = Mutex.new
	@products_cv = ConditionVariable.new
end

Instance Method Details

#bind_export(exp, imp) ⇒ Object



181
182
183
# File 'lib/fairy/master/c-direct-product.rb', line 181

def bind_export(exp, imp)
	# do nothing
end

#each_assigned_filter(&block) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fairy/master/c-direct-product.rb', line 137

def each_assigned_filter(&block)
	Thread.start do
	  @main.other_prefilters.each do |p| 
	    p.each_node do |n| 
 @exports_mutex.synchronize do 
		@exports[n] = n.exports.dc_dup
#		@exports_cv.broadcast
 end
	    end
	  end
	  @products_mutex.synchronize do 
	    @products = nodes.product(*@main.other_prefilters.collect{|p| p.nodes})
	    @products_cv.broadcast
	  end
	end

	super
end

#each_export_by(njob, mapper, &block) ⇒ Object

main prefilter 用



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fairy/master/c-direct-product.rb', line 157

def each_export_by(njob, mapper, &block)
	@exports_mutex.synchronize do
	  @exports[njob] = njob.exports.dc_dup
#	  @exports_cv.broadcast
	end
	@products_mutex.synchronize do
	  while !@products
	    @products_cv.wait(@products_mutex)
	  end
	  
	  post_njob_no = -1
	  @products.each do |main_njob, *others_njobs|
	    post_njob_no += 1
	    next if main_njob != njob
	    @others_njobs = others_njobs
	    
	    block.call(@exports[main_njob].shift,
   :init_njob => proc{|njob| 
			 njob.no = post_njob_no
			 njob.other_inputs = others_njobs.collect{|n| @exports[n].shift}})
	  end
	end
end

#main=(main) ⇒ Object



104
105
106
# File 'lib/fairy/master/c-direct-product.rb', line 104

def main=(main)
	@main = main
end

#njob_creation_paramsObject



112
113
114
# File 'lib/fairy/master/c-direct-product.rb', line 112

def njob_creation_params
	[@block_source]
end

#node_class_nameObject



108
109
110
# File 'lib/fairy/master/c-direct-product.rb', line 108

def node_class_name
	"PDirectProduct::PPreFilter"
end

#number_of_exportsObject



121
122
123
# File 'lib/fairy/master/c-direct-product.rb', line 121

def number_of_exports
	@main.no_of_exports_for_prefilter(self)
end

#number_of_nodes=(no) ⇒ Object



116
117
118
119
# File 'lib/fairy/master/c-direct-product.rb', line 116

def number_of_nodes=(no)
	super
	@main.update_prefilter_no_nodes(self)
end

#start_create_nodesObject



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fairy/master/c-direct-product.rb', line 125

def start_create_nodes
 	Log::debug self, "START_CREATE_NODES: #{self}"
 	@main.other_prefilters.each do |other|
 	  Thread.start do
other.each_assigned_filter do |input_filter|
  exp = input_filter.start_export
end
 	  end
 	end
	super
end