Class: OpenC3::TriggerGroupWorker
- Defined in:
- lib/openc3/microservices/trigger_group_microservice.rb
Overview
The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers for that packet.
Constant Summary collapse
- TYPE =
'type'.freeze
- ITEM_TARGET =
'target'.freeze
- ITEM_PACKET =
'packet'.freeze
- ITEM_TYPE =
'item'.freeze
- ITEM_VALUE_TYPE =
'valueType'.freeze
- FLOAT_TYPE =
'float'.freeze
- STRING_TYPE =
'string'.freeze
- REGEX_TYPE =
'regex'.freeze
- LIMIT_TYPE =
'limit'.freeze
- TRIGGER_TYPE =
'trigger'.freeze
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#packet ⇒ Object
readonly
Returns the value of attribute packet.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#target ⇒ Object
readonly
Returns the value of attribute target.
Instance Method Summary collapse
-
#evaluate(name:, left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger -1 (the value is considered an error used to disable the trigger) 0 (the value is considered as a false value) 1 (the value is considered as a true value).
-
#evaluate_data_packet(topic:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
-
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing…
-
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN.
-
#get_packet_value(operand:, previous:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted.
-
#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
constructor
A new instance of TriggerGroupWorker.
- #notify(name:, severity:, message:) ⇒ Object
-
#operand_value(operand:, other:, visited:, previous: false) ⇒ Object
extract the value of the operand from the packet.
- #run ⇒ Object
Constructor Details
#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
Returns a new instance of TriggerGroupWorker.
267 268 269 270 271 272 273 274 275 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 267 def initialize(name:, logger:, scope:, group:, queue:, share:, ident:) @name = name @logger = logger @scope = scope @group = group @queue = queue @share = share @ident = ident end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
265 266 267 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 265 def group @group end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
265 266 267 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 265 def name @name end |
#packet ⇒ Object (readonly)
Returns the value of attribute packet.
265 266 267 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 265 def packet @packet end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
265 266 267 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 265 def scope @scope end |
#target ⇒ Object (readonly)
Returns the value of attribute target.
265 266 267 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 265 def target @target end |
Instance Method Details
#evaluate(name:, left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 392 def evaluate(name:, left:, operator:, right:) @logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left}(#{left.class}) #{operator} #{right}(#{right.class}))" begin case operator when '>' return left > right ? 1 : 0 when '<' return left < right ? 1 : 0 when '>=' return left >= right ? 1 : 0 when '<=' return left <= right ? 1 : 0 when '!=', 'CHANGES' return left != right ? 1 : 0 when '==', 'DOES NOT CHANGE' return left == right ? 1 : 0 when '!~' return left !~ right ? 1 : 0 when '=~' return left =~ right ? 1 : 0 when 'AND' return left && right ? 1 : 0 when 'OR' return left || right ? 1 : 0 end rescue ArgumentError => error = "invalid evaluate: (#{left} #{operator} #{right})" notify(name: name, severity: 'error', message: ) return -1 end end |
#evaluate_data_packet(topic:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 309 def evaluate_data_packet(topic:) visited = Hash.new @logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}" @share.trigger_base.get_triggers(topic: topic).each do |trigger| @logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}" value = evaluate_trigger( head: trigger, trigger: trigger, visited: visited, triggers: @share.trigger_base.enabled_triggers ) @logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}" # value MUST be -1, 0, or 1 @share.trigger_base.update_state(name: trigger.name, value: value) end end |
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the visited Hash and eval_trigger will return a number.
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 436 def evaluate_trigger(head:, trigger:, visited:, triggers:) if visited["#{trigger.name}__R"] return visited["#{trigger.name}__R"] end if visited["#{trigger.name}__P"].nil? visited["#{trigger.name}__P"] = Hash.new end if visited["#{head.name}__P"][trigger.name] # Not sure if this is posible as on create it validates that the dependents are already created = "loop detected from #{head.name} -> #{trigger.name} path: #{visited["#{head.name}__P"]}" notify(name: trigger.name, severity: 'error', message: error.) return visited["#{trigger.name}__R"] = -1 end trigger.roots.each do | root_trigger_name | next if visited["#{root_trigger_name}__R"] root_trigger = triggers[root_trigger_name] if head.name == root_trigger.name = "loop detected from #{head.name} -> #{root_trigger_name} path: #{visited["#{head.name}__P"]}" notify(name: trigger.name, severity: 'error', message: error.) return visited["#{trigger.name}__R"] = -1 end result = evaluate_trigger( head: head, trigger: root_trigger, visited: visited, triggers: triggers ) @logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}" visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result end begin left = operand_value(operand: trigger.left, other: trigger.right, visited: visited) if trigger.operator.include?('CHANGE') right = operand_value(operand: trigger.left, other: trigger.right, visited: visited, previous: true) else right = operand_value(operand: trigger.right, other: trigger.left, visited: visited) end rescue => error # This will primarily happen when the user inputs a bad Regexp notify(name: trigger.name, severity: 'error', message: error.) return visited["#{trigger.name}__R"] = -1 end # Convert the standard '==' and '!=' into Ruby Regexp operators operator = trigger.operator if right and right.is_a? Regexp operator = '=~' if operator == '==' operator = '!~' if operator == '!=' end if left.nil? || right.nil? return visited["#{trigger.name}__R"] = 0 end result = evaluate(name: trigger.name,left: left, operator: operator, right: right) return visited["#{trigger.name}__R"] = result end |
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN
330 331 332 333 334 335 336 337 338 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 330 def get_packet_limit(operand:, other:) packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) return nil if packet.nil? _, limit = packet.read_with_limits_state(operand[ITEM_TYPE], operand[ITEM_VALUE_TYPE].intern) return limit end |
#get_packet_value(operand:, previous:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 342 def get_packet_value(operand:, previous:) if previous packet = @share.packet_base.previous_packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) # Previous might not be populated ... that's ok just return nil return nil unless packet else packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) end # This shouldn't happen because the frontend provides valid items but good to check # The raise is ultimately rescued inside evaluate_trigger when operand_value is called raise "Packet #{operand[ITEM_TARGET]} #{operand[ITEM_PACKET]} not found" if packet.nil? value = packet.read(operand[ITEM_TYPE], operand[ITEM_VALUE_TYPE].intern) raise "Item #{operand[ITEM_TARGET]} #{operand[ITEM_PACKET]} #{operand[ITEM_TYPE]} not found" if value.nil? value end |
#notify(name:, severity:, message:) ⇒ Object
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 277 def notify(name:, severity:, message:) data = {} # All AutonomicTopic notifications must have 'name' and 'updated_at' in the data data['name'] = name data['updated_at'] = Time.now.to_nsec_from_epoch data['severity'] = severity data['message'] = notification = { 'kind' => 'error', 'type' => 'trigger', 'data' => JSON.generate(data), } AutonomicTopic.write_notification(notification, scope: @scope) @logger.public_send(severity.intern, ) end |
#operand_value(operand:, other:, visited:, previous: false) ⇒ Object
extract the value of the operand from the packet
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 365 def operand_value(operand:, other:, visited:, previous: false) if operand[TYPE] == ITEM_TYPE && other && other[TYPE] == LIMIT_TYPE return get_packet_limit(operand: operand, other: other) elsif operand[TYPE] == ITEM_TYPE return get_packet_value(operand: operand, previous: previous) elsif operand[TYPE] == TRIGGER_TYPE return visited["#{operand[TRIGGER_TYPE]}__R"] == 1 elsif operand[TYPE] == FLOAT_TYPE return operand[operand[TYPE]].to_f elsif operand[TYPE] == STRING_TYPE return operand[operand[TYPE]].to_s elsif operand[TYPE] == REGEX_TYPE # This can potentially throw an exception on badly formatted Regexp return Regexp.new(operand[operand[TYPE]]) elsif operand[TYPE] == LIMIT_TYPE return operand[operand[TYPE]] else # This is a logic error ... should never get here raise "Unknown operand type: #{operand}" end end |
#run ⇒ Object
293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 293 def run @logger.info "TriggerGroupWorker-#{@ident} running" loop do topic = @queue.pop break if topic.nil? begin evaluate_data_packet(topic: topic) rescue StandardError => e @logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}" end end @logger.info "TriggerGroupWorker-#{@ident} exiting" end |