Class: Fairy::Processor
- Inherits:
-
Object
- Object
- Fairy::Processor
- Defined in:
- lib/fairy/processor.rb
Constant Summary collapse
- EXPORTS =
[]
- LIMIT_PROCESS_SIZE =
kbyte
100
- ACTIVE_STATUS =
{ :ST_INIT => true, :ST_WAIT_IMPORT => true, :ST_ACTIVATE => true }
- SEMIACTIVE_STATUS =
{ # :ST_INIT => true, # :ST_WAIT_IMPORT => true, :ST_ALL_IMPORTED => true, :ST_WAIT_EXPORT_FINISH => true, :ST_EXPORT_FINISH => true, :ST_OUTPUT_FINISH => true }
Instance Attribute Summary collapse
-
#addr ⇒ Object
Returns the value of attribute addr.
-
#deepconnect ⇒ Object
readonly
Returns the value of attribute deepconnect.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#njob_mon ⇒ Object
readonly
Returns the value of attribute njob_mon.
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#ntasks ⇒ Object
readonly
Returns the value of attribute ntasks.
Class Method Summary collapse
Instance Method Summary collapse
- #all_ntasks_finished?(lock = :lock) ⇒ Boolean
- #all_ntasks_finished_no_lock? ⇒ Boolean
- #all_ntasks_semiactivated?(lock = :lock) ⇒ Boolean
- #all_ntasks_semiactivated_no_lock? ⇒ Boolean
- #connect_controller(controller, conf) ⇒ Object
-
#create_import(policy) ⇒ Object
def create_njob(njob_class_name, bjob, opts, *rests) klass = import(njob_class_name) njob = klass.new(self, bjob, opts, *rests) @njobs.push njob Log.debugf(self, “Njob number of %d”, @njobs.size) njob end DeepConnect.def_method_spec(self, “REF create_njob(VAL, REF, VAL, *VAL)”).
- #create_ntask ⇒ Object
- #deregister_varray_element_proc ⇒ Object
- #exist_varray_elements? ⇒ Boolean
- #export(service, obj) ⇒ Object
- #import(service) ⇒ Object
-
#init_ntask_status_feature ⇒ Object
(processor) status management and ntask status management processor status: ST_WAIT ST_ACTIVATE.
-
#init_varray_feature ⇒ Object
varray management.
-
#initialize(id) ⇒ Processor
constructor
A new instance of Processor.
- #inspectx ⇒ Object
- #life_out_life_span? ⇒ Boolean
- #log_id ⇒ Object
- #no_active_ntasks ⇒ Object
- #no_ntasks ⇒ Object
- #notice_status(st) ⇒ Object
- #ntask_next_id ⇒ Object
- #process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) ⇒ Object
- #register_varray_element(array) ⇒ Object
- #start(node_port, service = 0) ⇒ Object
-
#start_process_status_monitor ⇒ Object
prossessor monitoring.
- #start_watch_status ⇒ Object
- #terminate ⇒ Object
- #terminate_all_ntasks ⇒ Object
- #to_s ⇒ Object
- #update_status(ntask, st) ⇒ Object
- #when_disconnected(deepspace, opts) ⇒ Object
Constructor Details
#initialize(id) ⇒ Processor
Returns a new instance of Processor.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fairy/processor.rb', line 54 def initialize(id) @id = id @reserve = 0 @services = {} @ntasks = [] @ntask_seq = -1 @ntask_seq_mutex = Mutex.new @njob_mon = FiberMon.new init_varray_feature init_ntask_status_feature end |
Instance Attribute Details
#addr ⇒ Object
Returns the value of attribute addr.
177 178 179 |
# File 'lib/fairy/processor.rb', line 177 def addr @addr end |
#deepconnect ⇒ Object (readonly)
Returns the value of attribute deepconnect.
79 80 81 |
# File 'lib/fairy/processor.rb', line 79 def deepconnect @deepconnect end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
70 71 72 |
# File 'lib/fairy/processor.rb', line 70 def id @id end |
#njob_mon ⇒ Object (readonly)
Returns the value of attribute njob_mon.
73 74 75 |
# File 'lib/fairy/processor.rb', line 73 def njob_mon @njob_mon end |
#node ⇒ Object (readonly)
Returns the value of attribute node.
178 179 180 |
# File 'lib/fairy/processor.rb', line 178 def node @node end |
#ntasks ⇒ Object (readonly)
Returns the value of attribute ntasks.
71 72 73 |
# File 'lib/fairy/processor.rb', line 71 def ntasks @ntasks end |
Class Method Details
.def_export(obj, name = nil) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fairy/processor.rb', line 38 def Processor.def_export(obj, name = nil) unless name if obj.kind_of?(Class) if /Fairy::(.*)$/ =~ obj.name name = $1 else name = obj.name end else ERR::Raise ERR::INTERNAL::CantDefExport, obj.to_s end end EXPORTS.push [name, obj] end |
Instance Method Details
#all_ntasks_finished?(lock = :lock) ⇒ Boolean
321 322 323 324 325 326 327 328 329 |
# File 'lib/fairy/processor.rb', line 321 def all_ntasks_finished?(lock = :lock) if lock == :lock @status_cv.synchronize do all_ntasks_finished_no_lock? end else all_ntasks_finished_no_lock? end end |
#all_ntasks_finished_no_lock? ⇒ Boolean
331 332 333 334 335 336 |
# File 'lib/fairy/processor.rb', line 331 def all_ntasks_finished_no_lock? for node, status in @ntask_status return false if status != :ST_FINISH end true end |
#all_ntasks_semiactivated?(lock = :lock) ⇒ Boolean
338 339 340 341 342 343 344 345 346 |
# File 'lib/fairy/processor.rb', line 338 def all_ntasks_semiactivated?(lock = :lock) if lock == :lock @status_cv.synchronize do all_ntasks_semiactivated_no_lock? end else all_ntasks_semiactivated_no_lock? end end |
#all_ntasks_semiactivated_no_lock? ⇒ Boolean
348 349 350 351 352 353 |
# File 'lib/fairy/processor.rb', line 348 def all_ntasks_semiactivated_no_lock? for node, status in @ntask_status return false unless SEMIACTIVE_STATUS[status] end true end |
#connect_controller(controller, conf) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/fairy/processor.rb', line 128 def connect_controller(controller, conf) @controller = controller conf.base_conf = CONF Fairy::REPLACE_CONF(conf) # Log::set_local_output_dev if CONF.PROCESSOR_MON_ON Log::info self, "Processor Status Monitoring: ON" start_process_status_monitor end $stdout = Stdout.new(controller) end |
#create_import(policy) ⇒ Object
def create_njob(njob_class_name, bjob, opts, *rests)
klass = import(njob_class_name)
njob = klass.new(self, bjob, opts, *rests)
@njobs.push njob
Log.debugf(self, "Njob number of %d", @njobs.size)
njob
end
DeepConnect.def_method_spec(self, "REF create_njob(VAL, REF, VAL, *VAL)")
222 223 224 225 226 227 228 |
# File 'lib/fairy/processor.rb', line 222 def create_import(policy) import = Import.new(policy) import.set_log_callback do |n, key| Log::verbose(self, "IMPORT POP key=#{key}: #{n}") end import end |
#create_ntask ⇒ Object
206 207 208 209 210 |
# File 'lib/fairy/processor.rb', line 206 def create_ntask ntask = PTask.new(ntask_next_id, self) @ntasks.push ntask ntask end |
#deregister_varray_element_proc ⇒ Object
275 276 277 278 279 280 281 |
# File 'lib/fairy/processor.rb', line 275 def deregister_varray_element_proc proc do |oid| @varray_elements_mutex.synchronize do @varray_elements.delete(oid) end end end |
#exist_varray_elements? ⇒ Boolean
262 263 264 265 266 |
# File 'lib/fairy/processor.rb', line 262 def exist_varray_elements? @varray_elements_mutex.synchronize do !@varray_elements.empty? end end |
#export(service, obj) ⇒ Object
184 185 186 |
# File 'lib/fairy/processor.rb', line 184 def export(service, obj) @services[service] = obj end |
#import(service) ⇒ Object
188 189 190 191 192 193 194 |
# File 'lib/fairy/processor.rb', line 188 def import(service) svs = @services[service] unless svs ERR::Raise ERR::INTERNAL::NoRegisterService, service end svs end |
#init_ntask_status_feature ⇒ Object
(processor) status management and ntask status management processor status:
ST_WAIT
ST_ACTIVATE
289 290 291 292 293 294 295 296 |
# File 'lib/fairy/processor.rb', line 289 def init_ntask_status_feature @status = :ST_WAIT @ntask_status = {} # @status_mutex = Mutex.new @status_cv = @njob_mon.new_cv end |
#init_varray_feature ⇒ Object
varray management
257 258 259 260 |
# File 'lib/fairy/processor.rb', line 257 def init_varray_feature @varray_elements = {} @varray_elements_mutex = Mutex.new end |
#inspectx ⇒ Object
493 494 495 |
# File 'lib/fairy/processor.rb', line 493 def inspectx "#<#{self.class}: #{id} [#{@ntask.collect{|n| n.class.name}.join(" ")}]>" end |
#life_out_life_span? ⇒ Boolean
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/fairy/processor.rb', line 233 def life_out_life_span? # puts "LOLS: #{inspectx}" # puts "njob: #{all_njob_finished?}" # unless all_njob_finished? # for njob, status in @njob_status # puts "#{njob.class} => #{status}" # end # end # puts "varry: #{exist_varray_elements?}" return false unless all_ntasks_finished? return false if exist_varray_elements? # 取りあえず vsz = `ps -ovsz h#{Process.pid}`.to_i #puts "vsz: #{vsz}, #{LIMIT_PROCESS_SIZE > vsz}" LIMIT_PROCESS_SIZE < vsz end |
#log_id ⇒ Object
75 76 77 |
# File 'lib/fairy/processor.rb', line 75 def log_id "Processor[#{@id}]" end |
#no_active_ntasks ⇒ Object
313 314 315 316 317 318 319 |
# File 'lib/fairy/processor.rb', line 313 def no_active_ntasks no_actives = 0 @ntask_status.each{|ntask, st| no_actives += 1 if ACTIVE_STATUS[st] } no_actives end |
#no_ntasks ⇒ Object
202 203 204 |
# File 'lib/fairy/processor.rb', line 202 def no_ntasks @ntasks.size end |
#notice_status(st) ⇒ Object
426 427 428 |
# File 'lib/fairy/processor.rb', line 426 def notice_status(st) @node.update_processor_status(self, st) end |
#ntask_next_id ⇒ Object
196 197 198 199 200 |
# File 'lib/fairy/processor.rb', line 196 def ntask_next_id @ntask_seq_mutex.synchronize do @ntask_seq += 1 end end |
#process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) ⇒ Object
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 |
# File 'lib/fairy/processor.rb', line 448 def process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) if inspect_p GC.start count = 0 count_by_class = {} ObjectSpace.each_object do |o| count += 1 klass = o.__deep_connect_real_class count_by_class[klass] = (count_by_class[klass] || 0) + 1 end exp = 0 exp_by_class = {} imp = 0 for ds in @deepconnect.instance_eval{@organizer}.deep_spaces.values exp_roots = ds.instance_eval{@export_roots} exp += exp_roots.size exp_roots.each do |k, v| klass = v.class exp_by_class[klass] = (exp_by_class[klass] || 0) + 1 end imp += ds.instance_eval{@import_reference.size} end end format = CONF.PROCESSOR_MON_PSFORMAT m = `ps -o#{format} h#{Process.pid}`.chomp Log::info(self) do |sio| sio.puts("PROCESS MONITOR:") sio.puts("#{Log.host} [P]\##{@id} MONITOR: PS: #{m}") if inspect_p sio.puts("#{Log.host} [P]\##{@id} MONITOR: OBJECT: #{count}") for klass in count_by_class.keys.sort_by{|k| k.name} sio.puts("#{Log.host} [P]\##{@id} MONITOR: C: #{klass.name} => #{count_by_class[klass]}") end sio.puts("#{Log.host} [P]\##{@id} MONITOR: DEEP-CONNECT: exports: #{exp}") for klass in exp_by_class.keys.sort_by{|k| k.name} sio.puts("#{Log.host} [P]\##{@id} MONITOR: C: #{klass.name} => #{exp_by_class[klass]}") end sio.puts("#{Log.host} [P]\##{@id} MONITOR: DEEP-CONNECT: imports: #{imp}") end end end |
#register_varray_element(array) ⇒ Object
268 269 270 271 272 273 |
# File 'lib/fairy/processor.rb', line 268 def register_varray_element(array) @varray_elements_mutex.synchronize do @varray_elements[array.object_id] = array.object_id end ObjectSpace.define_finalizer(array, deregister_varray_element_proc) end |
#start(node_port, service = 0) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fairy/processor.rb', line 81 def start(node_port, service=0) # if CONF.THREAD_STACK_SIZE # Process.setrlimit(Process::RLIMIT_STACK, CONF.THREAD_STACK_SIZE) # end @addr = nil @deepconnect = DeepConnect.start(service) @deepconnect.register_service("Processor", self) @deepconnect.when_disconnected do |deepspace, opts| when_disconnected(deepspace, opts) end for name, obj in EXPORTS export(name, obj) end @njob_mon.start require "fairy/share/inspector" @deepconnect.export("Inspector", Inspector.new(self)) require "fairy/share/log" @node_deepspace = @deepconnect.open_deepspace("localhost", node_port) @node = @node_deepspace.import("Node") @logger = @node.logger Log.type = "[P]" Log.pid =id Log.logger = @logger Log::info self, "Processor Service Start" Log::info(self, "\tfairy version: #{Version}") Log::info(self, "\t[Powered By #{RUBY_DESCRIPTION}]") start_watch_status # GC.disable # Thread.start do # loop do # sleep 60 # GC.start # end # end @node.register_processor(self) end |
#start_process_status_monitor ⇒ Object
prossessor monitoring
433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/fairy/processor.rb', line 433 def start_process_status_monitor Thread.start do begin idle = CONF.PROCESSOR_MON_INTERVAL loop do sleep idle process_status_mon end rescue Log::debug_exception raise end end end |
#start_watch_status ⇒ Object
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/fairy/processor.rb', line 399 def start_watch_status # 初期状態通知 notice_status(@status) @njob_mon.entry do @njob_mon.synchronize do old_status = nil old_no_active_ntasks = 0 loop do @status_cv.wait_while{ old_status == @status && old_no_active_ntasks == no_active_ntasks } no = no_active_ntasks if old_no_active_ntasks != no old_no_active_ntasks = no @controller.update_active_ntasks(self, no) end if old_status != @status old_status = @status notice_status(@status) end end end end nil end |
#terminate ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/fairy/processor.rb', line 144 def terminate # clientが終了したときの終了処理 Log::info(self, "terminate!!") Thread.start do begin # このメソッドが戻るまで待つ sleep 0.2 @ntasks.each{|ntask| ntask.abort_running} @deepconnect.stop Process.exit(0) rescue Log::debug(self, "Exception Rised in termination ntasks.") Log::debug_exception(self) end end nil end |
#terminate_all_ntasks ⇒ Object
163 164 165 166 167 168 169 170 171 |
# File 'lib/fairy/processor.rb', line 163 def terminate_all_ntasks Log::debug(self, "Terminate all ntasks!!") begin @ntasks.each{|ntask| ntask.abort_running} rescue Log::debug(self, "Exception Rised in termination ntasks.") Log::debug_exception(self) end end |
#to_s ⇒ Object
497 498 499 |
# File 'lib/fairy/processor.rb', line 497 def to_s "#<#{self.class}: #{id}>" end |
#update_status(ntask, st) ⇒ Object
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/fairy/processor.rb', line 356 def update_status(ntask, st) Log::debug(self, "UPDATE_STATUS: #{ntask}, #{st}") @njob_mon.synchronize do @ntask_status[ntask] = st case st when :ST_INIT # do nothing if all_ntasks_semiactivated?(:no_lock) Log::debug(self, "UPDATE_STATUS A: #{st}") @status = :ST_SEMIACTIVATE end when :ST_WAIT_IMPORT if all_ntasks_semiactivated?(:no_lock) Log::debug(self, "UPDATE_STATUS B: #{st}") @status = :ST_SEMIACTIVATE end when :ST_ACTIVATE Log::debug(self, "UPDATE_STATUS C: #{st}") @status = :ST_ACTIVATE when :ST_ALL_IMPORTED, :ST_WAIT_EXPORT_FINISH, :ST_EXPORT_FINISH, :ST_OUTPUT_FINISH if all_ntasks_semiactivated?(:no_lock) Log::debug(self, "UPDATE_STATUS D: #{st}") @status = :ST_SEMIACTIVATE end when :ST_FINISH if all_ntasks_finished?(:no_lock) Log::debug(self, "UPDATE_STATUS E: #{st}") @status = :ST_WAIT end else if @status == :ST_WAIT Log::debug(self, "UPDATE_STATUS F: #{st}") @status = :ST_ACTIVATE end end @status_cv.broadcast end end |
#when_disconnected(deepspace, opts) ⇒ Object
173 174 175 |
# File 'lib/fairy/processor.rb', line 173 def when_disconnected(deepspace, opts) Log::debug self, "PROCESSOR: disconnected #{deepspace.peer_uuid}" end |