Class: OpenC3::Operator

Inherits:
Object show all
Defined in:
lib/openc3/operators/operator.rb

Direct Known Subclasses

MicroserviceOperator

Constant Summary collapse

CYCLE_TIME =

cycle time to check for new microservices

5.0
@@instance =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeOperator

Returns a new instance of Operator.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/openc3/operators/operator.rb', line 151

def initialize
  Logger.level = Logger::INFO
  # TODO: This is pretty generic. Can we pass in more information to help identify the operator?
  Logger.microservice_name = 'MicroserviceOperator'
  Logger.tag = "operator.log"

  OperatorProcess.setup()
  @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds

  @ruby_process_name = ENV['OPENC3_RUBY']
  if RUBY_ENGINE != 'ruby'
    @ruby_process_name ||= 'jruby'
  else
    @ruby_process_name ||= 'ruby'
  end

  @processes = {}
  @new_processes = {}
  @changed_processes = {}
  @removed_processes = {}
  @mutex = Mutex.new
  @shutdown = false
  @shutdown_complete = false
end

Instance Attribute Details

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



145
146
147
# File 'lib/openc3/operators/operator.rb', line 145

def cycle_time
  @cycle_time
end

#processesObject (readonly)

Returns the value of attribute processes.



145
146
147
# File 'lib/openc3/operators/operator.rb', line 145

def processes
  @processes
end

Class Method Details

.instanceObject



288
289
290
# File 'lib/openc3/operators/operator.rb', line 288

def self.instance
  @@instance
end

.processesObject



284
285
286
# File 'lib/openc3/operators/operator.rb', line 284

def self.processes
  @@instance.processes
end

.runObject



279
280
281
282
# File 'lib/openc3/operators/operator.rb', line 279

def self.run
  @@instance = self.new
  @@instance.run
end

Instance Method Details

#remove_oldObject



204
205
206
207
208
209
210
211
212
# File 'lib/openc3/operators/operator.rb', line 204

def remove_old
  @mutex.synchronize do
    if @removed_processes.length > 0
      Logger.info("Shutting down #{@removed_processes.length} removed microservices...")
      shutdown_processes(@removed_processes)
      @removed_processes = {}
    end
  end
end

#respawn_changedObject



191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/openc3/operators/operator.rb', line 191

def respawn_changed
  @mutex.synchronize do
    if @changed_processes.length > 0
      Logger.info("Cycling #{@changed_processes.length} changed microservices...")
      shutdown_processes(@changed_processes)
      break if @shutdown

      @changed_processes.each { |name, p| p.start }
      @changed_processes = {}
    end
  end
end

#respawn_deadObject



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/openc3/operators/operator.rb', line 214

def respawn_dead
  @mutex.synchronize do
    @processes.each do |name, p|
      break if @shutdown

      unless p.alive?
        # Respawn process
        p.stdout.rewind
        output = p.stdout.read
        p.stdout.close
        p.stdout.unlink
        p.stderr.rewind
        err_output = p.stderr.read
        p.stderr.close
        p.stderr.unlink
        Logger.error("Unexpected process died... respawning! #{p.process_definition.join(' ')}\nStdout:\n#{output}\nStderr:\n#{err_output}\n", scope: p.scope)
        p.start
      end
    end
  end
end

#runObject



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/openc3/operators/operator.rb', line 242

def run
  # Use at_exit to shutdown cleanly
  at_exit do
    @shutdown = true
    @mutex.synchronize do
      Logger.info("Shutting down processes...")
      shutdown_processes(@processes)
      @shutdown_complete = true
    end
  end

  # Monitor processes and respawn if died
  Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...")
  loop do
    update()
    remove_old()
    respawn_changed()
    start_new()
    respawn_dead()
    break if @shutdown

    sleep(@cycle_time)
    break if @shutdown
  end

  loop do
    break if @shutdown_complete
    sleep(1)
  end
ensure
  Logger.info("#{self.class} shutdown complete")
end

#shutdown_processes(processes) ⇒ Object



236
237
238
239
240
# File 'lib/openc3/operators/operator.rb', line 236

def shutdown_processes(processes)
  processes.each { |name, p| p.soft_stop }
  sleep(2) # TODO: This is an arbitrary sleep of 2s ...
  processes.each { |name, p| p.hard_stop }
end

#start_newObject



180
181
182
183
184
185
186
187
188
189
# File 'lib/openc3/operators/operator.rb', line 180

def start_new
  @mutex.synchronize do
    if @new_processes.length > 0
      # Start all the processes
      Logger.info("#{self.class} starting each new process...")
      @new_processes.each { |name, p| p.start }
      @new_processes = {}
    end
  end
end

#stopObject



275
276
277
# File 'lib/openc3/operators/operator.rb', line 275

def stop
  @shutdown = true
end

#updateObject



176
177
178
# File 'lib/openc3/operators/operator.rb', line 176

def update
  raise "Implement in subclass"
end