Class: Fairy::CHere
Instance Attribute Summary
Attributes inherited from CIOFilter
#input
Attributes included from CInputtable
#input
Instance Method Summary
collapse
Methods inherited from CIOFilter
#node_class, #output=
#break_running, #initialize, #inputtable?
Methods inherited from CFilter
#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #break_running, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #initialize, #input, #job_pool_dict, #job_pool_variable, #njob_creation_params, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_create_nodes, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?
Instance Method Details
#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/fairy/master/c-here.rb', line 20
def create_and_add_node(ntask, mapper, opts={})
node = create_node(ntask) {|node|
if opts[:init_njob]
opts[:init_njob].call(node)
end
mapper.bind_input(node)
}
node
end
|
#each(&block) ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/fairy/master/c-here.rb', line 31
def each(&block)
policy = @opts[:prequeuing_policy]
each_node do |node|
node.start_export
import = Import.new(policy)
import.set_log_callback do |n, key|
Log::verbose(self, "IMPORT POP key=#{key}: #{n}")
end
import.no_import = 1
node.export.output = import
import.each do |e|
block.call e
end
end
end
|
#each_buf(&block) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/fairy/master/c-here.rb', line 48
def each_buf(&block)
threshold = @opts[:pool_threshold] || CONF.HERE_POOL_THRESHOLD
chunk = []
policy = @opts[:prequeuing_policy]
each_node do |node|
node.start_export
import = Import.new(policy)
import.set_log_callback do |n, key|
Log::verbose(self, "IMPORT POP key=#{key}: #{n}")
end
import.no_import = 1
node.export.output = import
import.each do |e|
chunk.push e
if chunk.size > threshold
block.call chunk
chunk.clear
end
end
end
if !chunk.empty?
block.call chunk
end
end
|
#node_class_name ⇒ Object
16
17
18
|
# File 'lib/fairy/master/c-here.rb', line 16
def node_class_name
"PHere"
end
|