Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0
- PROCESS_SHUTDOWN_SECONDS =
5.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/openc3/operators/operator.rb', line 236 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
229 230 231 |
# File 'lib/openc3/operators/operator.rb', line 229 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
229 230 231 |
# File 'lib/openc3/operators/operator.rb', line 229 def processes @processes end |
Class Method Details
.instance ⇒ Object
392 393 394 |
# File 'lib/openc3/operators/operator.rb', line 392 def self.instance @@instance end |
.processes ⇒ Object
388 389 390 |
# File 'lib/openc3/operators/operator.rb', line 388 def self.processes @@instance.processes end |
.run ⇒ Object
383 384 385 386 |
# File 'lib/openc3/operators/operator.rb', line 383 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
287 288 289 290 291 292 293 294 295 |
# File 'lib/openc3/operators/operator.rb', line 287 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/openc3/operators/operator.rb', line 274 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/openc3/operators/operator.rb', line 297 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/openc3/operators/operator.rb', line 353 def run # Use at_exit to shutdown cleanly at_exit { shutdown() } # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown ⇒ Object
343 344 345 346 347 348 349 350 351 |
# File 'lib/openc3/operators/operator.rb', line 343 def shutdown @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end |
#shutdown_processes(processes) ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/openc3/operators/operator.rb', line 312 def shutdown_processes(processes) # Make a copy so we don't mutate original processes = processes.dup Logger.info("Commanding soft stops...") processes.each { |name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.info("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.info("Soft stop all successful") break end sleep(0.1) end if processes.length > 0 Logger.info("Commanding hard stops...") processes.each { |name, p| p.hard_stop } end end |
#start_new ⇒ Object
263 264 265 266 267 268 269 270 271 272 |
# File 'lib/openc3/operators/operator.rb', line 263 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
379 380 381 |
# File 'lib/openc3/operators/operator.rb', line 379 def stop @shutdown = true end |
#update ⇒ Object
259 260 261 |
# File 'lib/openc3/operators/operator.rb', line 259 def update raise "Implement in subclass" end |