Class: Fairy::CBasicGroupBy

Inherits:
CIOFilter show all
Defined in:
lib/fairy/master/c-basic-group-by.rb

Instance Attribute Summary

Attributes inherited from CIOFilter

#input

Attributes included from CInputtable

#input

Instance Method Summary collapse

Methods inherited from CIOFilter

#node_class, #output=

Methods included from CInputtable

#break_running, #inputtable?

Methods inherited from CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CBasicGroupBy

Returns a new instance of CBasicGroupBy.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/fairy/master/c-basic-group-by.rb', line 13

def initialize(controller, opts, block_source)
  super
  @block_source = block_source

  @no_of_exports = 0

  # key -> [export, ...]
  @exports = {}
  @exports_mutex = Mutex.new
  @exports_cv = ConditionVariable.new

#      @pre_exports_queue = Queue.new
  @exports_queue = Queue.new

  @each_export_by_thread = nil
  @each_export_by_thread_mutex = Mutex.new
end

Instance Method Details

#add_exports(key, export, njob) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fairy/master/c-basic-group-by.rb', line 111

def add_exports(key, export, njob)
  @exports_mutex.synchronize do
	if exports = @exports[key]
	  export.output = exports.first.output if exports.first.output?
	  export.no = exports.first.no
	  exports.push export
	else
	  export.no = @no_of_exports
	  @no_of_exports += 1
	  @exports[key] = [export]
	  @exports_queue.push [export, njob]
#	  @pre_exports_queue.push [export, njob]
	end
  end
end

#all_node_arrived?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/fairy/master/c-basic-group-by.rb', line 220

def all_node_arrived?
  @nodes_mutex.synchronize{@number_of_nodes}
end

#all_node_imported?Boolean

Returns:

  • (Boolean)


224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/fairy/master/c-basic-group-by.rb', line 224

def all_node_imported?
  # すべてのnjobがそろったか?
  return false unless @nodes_mutex.synchronize{@number_of_nodes}

  each_node(:exist_only) do |node|
	st = @nodes_status[node]
	# こちらはNG: outputが設定されていないとまずい.
	# すべてのnodeがそろったとしてもすべてのexportがそろっているとは限らない
#	unless [:ST_FINISH, :ST_EXPORT_FINISH, :ST_WAIT_EXPORT_FINISH, :ST_ALL_IMPORTED].include?(st)
	unless [:ST_FINISH, :ST_EXPORT_FINISH, :ST_WAIT_EXPORT_FINISH].include?(st)
	  return false
	end
  end
  true
end

#bind_export(exp, imp) ⇒ Object



107
108
109
# File 'lib/fairy/master/c-basic-group-by.rb', line 107

def bind_export(exp, imp)
  # do nothing
end

#each_assigned_filter(&block) ⇒ Object

def next_filter(mapper)

ret = super
unless ret

@each_export_by_thread_mutex.synchronize do @each_export_by_thread.join if @each_export_by_thread end

  end
  ret 
end


53
54
55
56
57
58
59
# File 'lib/fairy/master/c-basic-group-by.rb', line 53

def each_assigned_filter(&block)
  super

  @each_export_by_thread_mutex.synchronize do
	@each_export_by_thread.join if @each_export_by_thread
  end
end

#each_export_by(njob, mapper, &block) ⇒ Object

begin while pair = @exports_queue.pop exp, njob = pair Log::debug(self, “EXPORT_BY, #Fairy::CBasicGroupBy.expexp.key”) block.call exp end

rescue

Log::fatal_exception

  end
  @each_export_by_thread = true
end


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fairy/master/c-basic-group-by.rb', line 76

def each_export_by(njob, mapper, &block)
  @each_export_by_thread_mutex.synchronize do
	return if @each_export_by_thread

	@each_export_by_thread = Thread.start{
	  # すべての njob がそろうまで待つ
	  # 後段が先にスケジュールされてデッドロックするのを避けるため.
	  number_of_nodes

	  begin
 while pair = @exports_queue.pop
   exp, njob = pair
Log::debug(self, "EXPORT_BY, #{exp.key}")
   block.call exp

   @exports_mutex.synchronize do
		if @exports[exp.key].first == exp
@exports[exp.key][1..-1].each do |e|
  e.output = exp.output
end
		end
   end
 end
	  rescue
 Log::fatal_exception
 raise
	  end
	}
  end
end

#njob_creation_paramsObject



136
137
138
# File 'lib/fairy/master/c-basic-group-by.rb', line 136

def njob_creation_params
  [@block_source]
end

#node_class_nameObject



132
133
134
# File 'lib/fairy/master/c-basic-group-by.rb', line 132

def node_class_name
  "PBasicGroupBy"
end

#start_create_nodesObject



31
32
33
34
35
# File 'lib/fairy/master/c-basic-group-by.rb', line 31

def start_create_nodes
  super

  start_watch_all_node_imported
end

#start_watch_all_node_importedObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fairy/master/c-basic-group-by.rb', line 140

def start_watch_all_node_imported
  Thread.start do
	# すべての njob がそろうまで待つ
	# 後段が先にスケジュールされてデッドロックするのを避けるため.
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: S")
	number_of_nodes

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1")


Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 2")
	# すべての exports がそろうまで待つ
	@nodes_status_mutex.synchronize do
	  while !all_node_imported?
 @nodes_status_cv.wait(@nodes_status_mutex)
	  end
	end
	@exports_queue.push nil

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 4")
	for key, exports in @exports
	  exports.first.output_no_import = exports.size
	end
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: E")
  end
  nil
end

#start_watch_all_node_imported_ORGObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/fairy/master/c-basic-group-by.rb', line 168

def start_watch_all_node_imported_ORG
  Thread.start do
	# すべての njob がそろうまで待つ
	# 後段が先にスケジュールされてデッドロックするのを避けるため.
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: S")
	number_of_nodes

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1")
	# すでに存在するexportsを下流に送る
	@exports_mutex.synchronize do
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.1")
	  @pre_exports_queue.push nil
	  while pair = @pre_exports_queue.pop
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.2: EXP.NO: #{pair[0].no}")
 @exports_queue.push pair
	  end
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.3")
	end

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 2")
	# すべての exports がそろうまで待つ
	@nodes_status_mutex.synchronize do
	  while !all_node_imported?
 @nodes_status_cv.wait(@nodes_status_mutex)
	  end
	end

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 3")
	# 残りのexportsを下流に送る
	@pre_exports_queue.push nil
	while pair = @pre_exports_queue.pop
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 3.1: EXP.NO: #{pair[0].no}")
	  @exports_queue.push pair
	end
	@exports_queue.push nil
	
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 4")
#Log::debug(self, "START: setting for EXPOTRS.SIZE")
	for key, exports in @exports
#	  exports[1..-1].each do |exp|
#	    exp.output=exports.first.output
#	  end

#Log::debug(self, "EXPOTRS.SIZE=#{exports.size}")
	  exports.first.output_no_import = exports.size
	end
#Log::debug(self, "END: setting for EXPOTRS.SIZE")
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: E")
  end
  nil
end

#update_exports(key, export, njob) ⇒ Object



127
128
129
130
# File 'lib/fairy/master/c-basic-group-by.rb', line 127

def update_exports(key, export, njob)
  add_exports(key, export, njob)
  nil
end