Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/openc3/operators/operator.rb', line 151 def initialize Logger.level = Logger::INFO # TODO: This is pretty generic. Can we pass in more information to help identify the operator? Logger.microservice_name = 'MicroserviceOperator' Logger.tag = "operator.log" OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
145 146 147 |
# File 'lib/openc3/operators/operator.rb', line 145 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
145 146 147 |
# File 'lib/openc3/operators/operator.rb', line 145 def processes @processes end |
Class Method Details
.instance ⇒ Object
288 289 290 |
# File 'lib/openc3/operators/operator.rb', line 288 def self.instance @@instance end |
.processes ⇒ Object
284 285 286 |
# File 'lib/openc3/operators/operator.rb', line 284 def self.processes @@instance.processes end |
.run ⇒ Object
279 280 281 282 |
# File 'lib/openc3/operators/operator.rb', line 279 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
204 205 206 207 208 209 210 211 212 |
# File 'lib/openc3/operators/operator.rb', line 204 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/openc3/operators/operator.rb', line 191 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/openc3/operators/operator.rb', line 214 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown unless p.alive? # Respawn process p.stdout.rewind output = p.stdout.read p.stdout.close p.stdout.unlink p.stderr.rewind err_output = p.stderr.read p.stderr.close p.stderr.unlink Logger.error("Unexpected process died... respawning! #{p.process_definition.join(' ')}\nStdout:\n#{output}\nStderr:\n#{err_output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/openc3/operators/operator.rb', line 242 def run # Use at_exit to shutdown cleanly at_exit do @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) @shutdown_complete = true end end # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown_processes(processes) ⇒ Object
236 237 238 239 240 |
# File 'lib/openc3/operators/operator.rb', line 236 def shutdown_processes(processes) processes.each { |name, p| p.soft_stop } sleep(2) # TODO: This is an arbitrary sleep of 2s ... processes.each { |name, p| p.hard_stop } end |
#start_new ⇒ Object
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/openc3/operators/operator.rb', line 180 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
275 276 277 |
# File 'lib/openc3/operators/operator.rb', line 275 def stop @shutdown = true end |
#update ⇒ Object
176 177 178 |
# File 'lib/openc3/operators/operator.rb', line 176 def update raise "Implement in subclass" end |