Class: OpenC3::ReactionWorker
- Defined in:
- lib/openc3/microservices/reaction_microservice.rb
Overview
The Reaction worker is a very simple thread pool worker. Once the manager queues a trigger to evaluate against the reactions. The worker will check the reactions to see if it needs to fire any reactions.
Constant Summary collapse
- REACTION_METRIC_NAME =
'reaction_duration_seconds'.freeze
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#share ⇒ Object
readonly
Returns the value of attribute share.
Instance Method Summary collapse
- #get_token(username) ⇒ Object
-
#initialize(name:, logger:, scope:, share:, ident:) ⇒ ReactionWorker
constructor
A new instance of ReactionWorker.
- #process_enabled_trigger(data:) ⇒ Object
- #reaction(data:) ⇒ Object
- #run ⇒ Object
- #run_action(reaction:, action:) ⇒ Object
- #run_command(reaction:, action:) ⇒ Object
- #run_reaction(reaction:) ⇒ Object
- #run_script(reaction:, action:) ⇒ Object
Constructor Details
#initialize(name:, logger:, scope:, share:, ident:) ⇒ ReactionWorker
Returns a new instance of ReactionWorker.
229 230 231 232 233 234 235 236 237 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 229 def initialize(name:, logger:, scope:, share:, ident:) @name = name @logger = logger @scope = scope @share = share @ident = ident @metric_output_time = 0 @metric = Metric.new(microservice: @name, scope: @scope) end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
227 228 229 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227 def name @name end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
227 228 229 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227 def scope @scope end |
#share ⇒ Object (readonly)
Returns the value of attribute share.
227 228 229 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227 def share @share end |
Instance Method Details
#get_token(username) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 239 def get_token(username) if ENV['OPENC3_API_CLIENT'].nil? return OpenC3Authentication.new().token else # Check for offline access token model = nil model = OpenC3::OfflineAccessModel.get_model(name: username, scope: @scope) if username and username != '' if model and model.offline_access_token auth = OpenC3KeycloakAuthentication.new(ENV['OPENC3_KEYCLOAK_URL']) return auth.get_token_from_refresh_token(model.offline_access_token) else return nil end end end |
#process_enabled_trigger(data:) ⇒ Object
283 284 285 286 287 288 289 290 291 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 283 def process_enabled_trigger(data:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) @share.reaction_base.get_reactions(trigger_name: data['name']).each do | reaction | run_reaction(reaction: reaction) end diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'type' => 'trigger', 'thread' => "worker-#{@ident}" } @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels) end |
#reaction(data:) ⇒ Object
255 256 257 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 255 def reaction(data:) return ReactionModel.from_json(data, name: data['name'], scope: data['scope']) end |
#run ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 259 def run @logger.info "ReactionWorker-#{@ident} running" loop do begin kind, data = @share.queue_base.queue.pop break if kind.nil? || data.nil? case kind when 'reaction' run_reaction(reaction: reaction(data: data)) when 'trigger' process_enabled_trigger(data: data) end current_time = Time.now.to_i if @metric_output_time < current_time @metric.output @metric_output_time = current_time + 120 end rescue StandardError => e @logger.error "ReactionWorker-#{@ident} failed to evaluate kind: #{kind} data: #{data}\n#{e.formatted}" end end @logger.info "ReactionWorker-#{@ident} exiting" end |
#run_action(reaction:, action:) ⇒ Object
304 305 306 307 308 309 310 311 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 304 def run_action(reaction:, action:) case action['type'] when 'command' run_command(reaction: reaction, action: action) when 'script' run_script(reaction: reaction, action: action) end end |
#run_command(reaction:, action:) ⇒ Object
313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 313 def run_command(reaction:, action:) @logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, command: '#{action['value']}' " begin username = reaction.username token = get_token(username) raise "No token available for username: #{username}" unless token cmd_no_hazardous_check(action['value'], scope: @scope, token: token) @logger.info "ReactionWorker-#{@ident} #{reaction.name} command action complete, #{action['value']}" rescue StandardError => e @logger.error "ReactionWorker-#{@ident} #{reaction.name} command action failed, #{action}\n#{e.}" end end |
#run_reaction(reaction:) ⇒ Object
293 294 295 296 297 298 299 300 301 302 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 293 def run_reaction(reaction:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) reaction.actions.each do |action| run_action(reaction: reaction, action: action) end @share.reaction_base.sleep(name: reaction.name) diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'type' => 'reaction', 'thread' => "worker-#{@ident}" } @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels) end |
#run_script(reaction:, action:) ⇒ Object
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/openc3/microservices/reaction_microservice.rb', line 326 def run_script(reaction:, action:) @logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, script: '#{action['value']}'" begin username = reaction.username token = get_token(username) raise "No token available for username: #{username}" unless token request = Net::HTTP::Post.new( "/script-api/scripts/#{action['value']}/run?scope=#{@scope}", 'Content-Type' => 'application/json', 'Authorization' => token ) request.body = JSON.generate({ 'scope' => @scope, 'environment' => action['environment'], 'reaction' => reaction.name, 'id' => Time.now.to_i }) hostname = ENV['OPENC3_SCRIPT_HOSTNAME'] || 'openc3-cosmos-script-runner-api' response = Net::HTTP.new(hostname, 2902).request(request) raise "failed to call #{hostname}, for script: #{action['value']}, response code: #{response.code}" if response.code != '200' @logger.info "ReactionWorker-#{@ident} #{reaction.name} script action complete, #{action['value']} => #{response.body}" rescue StandardError => e @logger.error "ReactionWorker-#{@ident} #{reaction.name} script action failed, #{action}\n#{e.}" end end |