Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0- PROCESS_SHUTDOWN_SECONDS =
5.0- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/openc3/operators/operator.rb', line 236 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
229 230 231 |
# File 'lib/openc3/operators/operator.rb', line 229 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
229 230 231 |
# File 'lib/openc3/operators/operator.rb', line 229 def processes @processes end |
Class Method Details
.instance ⇒ Object
389 390 391 |
# File 'lib/openc3/operators/operator.rb', line 389 def self.instance @@instance end |
.processes ⇒ Object
385 386 387 |
# File 'lib/openc3/operators/operator.rb', line 385 def self.processes @@instance.processes end |
.run ⇒ Object
380 381 382 383 |
# File 'lib/openc3/operators/operator.rb', line 380 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
287 288 289 290 291 292 293 294 295 |
# File 'lib/openc3/operators/operator.rb', line 287 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/openc3/operators/operator.rb', line 274 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/openc3/operators/operator.rb', line 297 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/openc3/operators/operator.rb', line 350 def run # Use at_exit to shutdown cleanly at_exit { shutdown() } # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown ⇒ Object
340 341 342 343 344 345 346 347 348 |
# File 'lib/openc3/operators/operator.rb', line 340 def shutdown @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end |
#shutdown_processes(processes) ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/openc3/operators/operator.rb', line 312 def shutdown_processes(processes) Logger.info("Commanding soft stops...") processes.each { |name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.info("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.info("Soft stop all successful") break end sleep(0.1) end if processes.length > 0 Logger.info("Commanding hard stops...") processes.each { |name, p| p.hard_stop } end end |
#start_new ⇒ Object
263 264 265 266 267 268 269 270 271 272 |
# File 'lib/openc3/operators/operator.rb', line 263 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
376 377 378 |
# File 'lib/openc3/operators/operator.rb', line 376 def stop @shutdown = true end |
#update ⇒ Object
259 260 261 |
# File 'lib/openc3/operators/operator.rb', line 259 def update raise "Implement in subclass" end |