Class: Fairy::CSegZip

Inherits:
CIOFilter show all
Defined in:
lib/fairy/master/c-seg-zip.rb

Defined Under Namespace

Classes: CPreSegZipFilter, NoAllFilter

Constant Summary collapse

ZIP_BY_SEGMENT =
:ZIP_BY_SEGMENT

Instance Attribute Summary

Attributes inherited from CIOFilter

#input

Attributes included from CInputtable

#input

Instance Method Summary collapse

Methods inherited from CIOFilter

#node_class, #output=

Methods included from CInputtable

#inputtable?

Methods inherited from CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, others, block_source) ⇒ CSegZip

Returns a new instance of CSegZip.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fairy/master/c-seg-zip.rb', line 17

def initialize(controller, opts, others, block_source)
  super
  @others = others
  @block_source = block_source

  #@exports = [{o=>filter, ...}, ...]
  @exports = []
  @others_status = {}
  @exports_mutex = Mutex.new
  @exports_cv = ConditionVariable.new
end

Instance Method Details

#break_runningObject



110
111
112
113
# File 'lib/fairy/master/c-seg-zip.rb', line 110

def break_running
  super
  @others.each{|others| Thread.start{others.break_running}}
end

#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fairy/master/c-seg-zip.rb', line 66

def create_and_add_node(ntask, mapper, opts={})
  unless opt_zip_by_substream?
 	ERR::Raise ERR::NoImplement, "except zip_by_segment"
  end

  node = create_node(ntask) {|node|
	if opts[:init_njob]
	  opts[:init_njob].call(node)
	end
	mapper.bind_input(node)

	no = node.no
	exps = nil
	@exports_mutex.synchronize do
	  while !(exps = other_filter_of(no))
 @exports_cv.wait(@exports_mutex)
	  end
	end
 	node.zip_inputs = exps
 	exps.zip(node.zip_imports) do |other, import| 
	  other.output = import
	  import.no_import = 1
	end
  }
  node
end

#njob_creation_paramsObject



37
38
39
# File 'lib/fairy/master/c-seg-zip.rb', line 37

def njob_creation_params
  [@block_source]
end

#node_class_nameObject



33
34
35
# File 'lib/fairy/master/c-seg-zip.rb', line 33

def node_class_name
  "PSegZip"
end

#opt_zip_by_substream?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fairy/master/c-seg-zip.rb', line 29

def opt_zip_by_substream?
  @opts[ZIP_BY_SEGMENT]
end

#other_filter_of(no) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fairy/master/c-seg-zip.rb', line 93

def other_filter_of(no)
  begin
	@others.collect do |o| 
	  unless exp = @exports[no][o]
 unless @other_status[o]
   raise NoAllFilter
 end
	  end
	  exp
	end
  rescue NoAllFilter
	return nil
  rescue
	return nil
  end
end

#start_create_nodesObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fairy/master/c-seg-zip.rb', line 41

def start_create_nodes
  Log::debug self, "START_CREATE_NODES: #{self}"
  @others.each do |other|
	Thread.start do
	  other.each_assigned_filter do |input_filter|
 @exports_mutex.synchronize do
   unless exps = @exports[input_filter.no]
		exps = @exports[input_filter.no] = {}
		@exports_cv.broadcast
   end
   exp = input_filter.start_export
   exps[other] = exp
 end
	  end
	  @exports_mutex.synchronize do
 @others_status[other] = true
 @exports_cv.broadcast
	  end
	end
  end
  super
end