Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0
- PROCESS_SHUTDOWN_SECONDS =
5.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/openc3/operators/operator.rb', line 234 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
227 228 229 |
# File 'lib/openc3/operators/operator.rb', line 227 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
227 228 229 |
# File 'lib/openc3/operators/operator.rb', line 227 def processes @processes end |
Class Method Details
.instance ⇒ Object
385 386 387 |
# File 'lib/openc3/operators/operator.rb', line 385 def self.instance @@instance end |
.processes ⇒ Object
381 382 383 |
# File 'lib/openc3/operators/operator.rb', line 381 def self.processes @@instance.processes end |
.run ⇒ Object
376 377 378 379 |
# File 'lib/openc3/operators/operator.rb', line 376 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
285 286 287 288 289 290 291 292 293 |
# File 'lib/openc3/operators/operator.rb', line 285 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/openc3/operators/operator.rb', line 272 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/openc3/operators/operator.rb', line 295 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/openc3/operators/operator.rb', line 338 def run # Use at_exit to shutdown cleanly at_exit do @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown_processes(processes) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/openc3/operators/operator.rb', line 310 def shutdown_processes(processes) Logger.info("Commanding soft stops...") processes.each { |name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.info("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.info("Soft stop all successful") break end sleep(0.1) end if processes.length > 0 Logger.info("Commanding hard stops...") processes.each { |name, p| p.hard_stop } end end |
#start_new ⇒ Object
261 262 263 264 265 266 267 268 269 270 |
# File 'lib/openc3/operators/operator.rb', line 261 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
372 373 374 |
# File 'lib/openc3/operators/operator.rb', line 372 def stop @shutdown = true end |
#update ⇒ Object
257 258 259 |
# File 'lib/openc3/operators/operator.rb', line 257 def update raise "Implement in subclass" end |