Class: OpenC3::Operator

Inherits:
Object show all
Defined in:
lib/openc3/operators/operator.rb

Direct Known Subclasses

MicroserviceOperator

Constant Summary collapse

CYCLE_TIME =

cycle time to check for new microservices

5.0
PROCESS_SHUTDOWN_SECONDS =
5.0
@@instance =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeOperator

Returns a new instance of Operator.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/openc3/operators/operator.rb', line 236

def initialize
  Logger.level = Logger::INFO
  Logger.microservice_name = 'MicroserviceOperator'

  OperatorProcess.setup()
  @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds

  @ruby_process_name = ENV['OPENC3_RUBY']
  if RUBY_ENGINE != 'ruby'
    @ruby_process_name ||= 'jruby'
  else
    @ruby_process_name ||= 'ruby'
  end

  @processes = {}
  @new_processes = {}
  @changed_processes = {}
  @removed_processes = {}
  @mutex = Mutex.new
  @shutdown = false
  @shutdown_complete = false
end

Instance Attribute Details

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



229
230
231
# File 'lib/openc3/operators/operator.rb', line 229

def cycle_time
  @cycle_time
end

#processesObject (readonly)

Returns the value of attribute processes.



229
230
231
# File 'lib/openc3/operators/operator.rb', line 229

def processes
  @processes
end

Class Method Details

.instanceObject



389
390
391
# File 'lib/openc3/operators/operator.rb', line 389

def self.instance
  @@instance
end

.processesObject



385
386
387
# File 'lib/openc3/operators/operator.rb', line 385

def self.processes
  @@instance.processes
end

.runObject



380
381
382
383
# File 'lib/openc3/operators/operator.rb', line 380

def self.run
  @@instance = self.new
  @@instance.run
end

Instance Method Details

#remove_oldObject



287
288
289
290
291
292
293
294
295
# File 'lib/openc3/operators/operator.rb', line 287

def remove_old
  @mutex.synchronize do
    if @removed_processes.length > 0
      Logger.info("Shutting down #{@removed_processes.length} removed microservices...")
      shutdown_processes(@removed_processes)
      @removed_processes = {}
    end
  end
end

#respawn_changedObject



274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/openc3/operators/operator.rb', line 274

def respawn_changed
  @mutex.synchronize do
    if @changed_processes.length > 0
      Logger.info("Cycling #{@changed_processes.length} changed microservices...")
      shutdown_processes(@changed_processes)
      break if @shutdown

      @changed_processes.each { |name, p| p.start }
      @changed_processes = {}
    end
  end
end

#respawn_deadObject



297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/openc3/operators/operator.rb', line 297

def respawn_dead
  @mutex.synchronize do
    @processes.each do |name, p|
      break if @shutdown
      p.output_increment
      unless p.alive?
        # Respawn process
        output = p.extract_output
        Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope)
        p.start
      end
    end
  end
end

#runObject



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/openc3/operators/operator.rb', line 350

def run
  # Use at_exit to shutdown cleanly
  at_exit { shutdown() }

  # Monitor processes and respawn if died
  Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...")
  loop do
    update()
    remove_old()
    respawn_changed()
    start_new()
    respawn_dead()
    break if @shutdown

    sleep(@cycle_time)
    break if @shutdown
  end

  loop do
    break if @shutdown_complete
    sleep(0.1)
  end
ensure
  Logger.info("#{self.class} shutdown complete")
end

#shutdownObject



340
341
342
343
344
345
346
347
348
# File 'lib/openc3/operators/operator.rb', line 340

def shutdown
  @shutdown = true
  @mutex.synchronize do
    Logger.info("Shutting down processes...")
    shutdown_processes(@processes)
    Logger.info("Shutting down processes complete")
    @shutdown_complete = true
  end
end

#shutdown_processes(processes) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/openc3/operators/operator.rb', line 312

def shutdown_processes(processes)
  Logger.info("Commanding soft stops...")
  processes.each { |name, p| p.soft_stop }
  start_time = Time.now
  # Allow sufficient time for processes to shutdown cleanly
  while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS
    processes_to_remove = []
    processes.each do |name, p|
      unless p.alive?
        processes_to_remove << name
        Logger.info("Soft stop process successful: #{p.cmd_line}", scope: p.scope)
      end
    end
    processes_to_remove.each do |name|
      processes.delete(name)
    end
    if processes.length <= 0
      Logger.info("Soft stop all successful")
      break
    end
    sleep(0.1)
  end
  if processes.length > 0
    Logger.info("Commanding hard stops...")
    processes.each { |name, p| p.hard_stop }
  end
end

#start_newObject



263
264
265
266
267
268
269
270
271
272
# File 'lib/openc3/operators/operator.rb', line 263

def start_new
  @mutex.synchronize do
    if @new_processes.length > 0
      # Start all the processes
      Logger.info("#{self.class} starting each new process...")
      @new_processes.each { |name, p| p.start }
      @new_processes = {}
    end
  end
end

#stopObject



376
377
378
# File 'lib/openc3/operators/operator.rb', line 376

def stop
  @shutdown = true
end

#updateObject



259
260
261
# File 'lib/openc3/operators/operator.rb', line 259

def update
  raise "Implement in subclass"
end