Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0
- PROCESS_SHUTDOWN_SECONDS =
5.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/openc3/operators/operator.rb', line 230 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
223 224 225 |
# File 'lib/openc3/operators/operator.rb', line 223 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
223 224 225 |
# File 'lib/openc3/operators/operator.rb', line 223 def processes @processes end |
Class Method Details
.instance ⇒ Object
381 382 383 |
# File 'lib/openc3/operators/operator.rb', line 381 def self.instance @@instance end |
.processes ⇒ Object
377 378 379 |
# File 'lib/openc3/operators/operator.rb', line 377 def self.processes @@instance.processes end |
.run ⇒ Object
372 373 374 375 |
# File 'lib/openc3/operators/operator.rb', line 372 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
281 282 283 284 285 286 287 288 289 |
# File 'lib/openc3/operators/operator.rb', line 281 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/openc3/operators/operator.rb', line 268 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/openc3/operators/operator.rb', line 291 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/openc3/operators/operator.rb', line 334 def run # Use at_exit to shutdown cleanly at_exit do @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown_processes(processes) ⇒ Object
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/openc3/operators/operator.rb', line 306 def shutdown_processes(processes) Logger.info("Commanding soft stops...") processes.each { |name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.info("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.info("Soft stop all successful") break end sleep(0.1) end if processes.length > 0 Logger.info("Commanding hard stops...") processes.each { |name, p| p.hard_stop } end end |
#start_new ⇒ Object
257 258 259 260 261 262 263 264 265 266 |
# File 'lib/openc3/operators/operator.rb', line 257 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
368 369 370 |
# File 'lib/openc3/operators/operator.rb', line 368 def stop @shutdown = true end |
#update ⇒ Object
253 254 255 |
# File 'lib/openc3/operators/operator.rb', line 253 def update raise "Implement in subclass" end |