Class: Fairy::CSegZip
Defined Under Namespace
Classes: CPreSegZipFilter, NoAllFilter
Constant Summary
collapse
- ZIP_BY_SEGMENT =
:ZIP_BY_SEGMENT
Instance Attribute Summary
Attributes inherited from CIOFilter
#input
Attributes included from CInputtable
#input
Instance Method Summary
collapse
Methods inherited from CIOFilter
#node_class, #output=
#inputtable?
Methods inherited from CFilter
#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?
Constructor Details
#initialize(controller, opts, others, block_source) ⇒ CSegZip
Returns a new instance of CSegZip.
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/fairy/master/c-seg-zip.rb', line 19
def initialize(controller, opts, others, block_source)
super
@others = others
@block_source = block_source
@exports = []
@others_status = {}
@exports_mutex = Mutex.new
@exports_cv = XThread::ConditionVariable.new
end
|
Instance Method Details
#break_running ⇒ Object
112
113
114
115
|
# File 'lib/fairy/master/c-seg-zip.rb', line 112
def break_running
super
@others.each{|others| Thread.start{others.break_running}}
end
|
#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/fairy/master/c-seg-zip.rb', line 68
def create_and_add_node(ntask, mapper, opts={})
unless opt_zip_by_substream?
ERR::Raise ERR::NoImplement, "except zip_by_segment"
end
node = create_node(ntask) {|node|
if opts[:init_njob]
opts[:init_njob].call(node)
end
mapper.bind_input(node)
no = node.no
exps = nil
@exports_mutex.synchronize do
while !(exps = other_filter_of(no))
@exports_cv.wait(@exports_mutex)
end
end
node.zip_inputs = exps
exps.zip(node.zip_imports) do |other, import|
other.output = import
import.no_import = 1
end
}
node
end
|
#njob_creation_params ⇒ Object
39
40
41
|
# File 'lib/fairy/master/c-seg-zip.rb', line 39
def njob_creation_params
[@block_source]
end
|
#node_class_name ⇒ Object
35
36
37
|
# File 'lib/fairy/master/c-seg-zip.rb', line 35
def node_class_name
"PSegZip"
end
|
#opt_zip_by_substream? ⇒ Boolean
31
32
33
|
# File 'lib/fairy/master/c-seg-zip.rb', line 31
def opt_zip_by_substream?
@opts[ZIP_BY_SEGMENT]
end
|
#other_filter_of(no) ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/fairy/master/c-seg-zip.rb', line 95
def other_filter_of(no)
begin
@others.collect do |o|
unless exp = @exports[no][o]
unless @other_status[o]
raise NoAllFilter
end
end
exp
end
rescue NoAllFilter
return nil
rescue
return nil
end
end
|
#start_create_nodes ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/fairy/master/c-seg-zip.rb', line 43
def start_create_nodes
Log::debug self, "START_CREATE_NODES: #{self}"
@others.each do |other|
Thread.start do
other.each_assigned_filter do |input_filter|
@exports_mutex.synchronize do
unless exps = @exports[input_filter.no]
exps = @exports[input_filter.no] = {}
@exports_cv.broadcast
end
exp = input_filter.start_export
exps[other] = exp
end
end
@exports_mutex.synchronize do
@others_status[other] = true
@exports_cv.broadcast
end
end
end
super
end
|