Class: OpenC3::TriggerGroupWorker
- Defined in:
- lib/openc3/microservices/trigger_group_microservice.rb
Overview
The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers in the kit and evaluate triggers for that packet.
Constant Summary collapse
- TRIGGER_METRIC_NAME =
'trigger_eval_duration_seconds'.freeze
- TYPE =
'type'.freeze
- ITEM_RAW =
'raw'.freeze
- ITEM_TARGET =
'target'.freeze
- ITEM_PACKET =
'packet'.freeze
- ITEM_TYPE =
'item'.freeze
- FLOAT_TYPE =
'float'.freeze
- STRING_TYPE =
'string'.freeze
- LIMIT_TYPE =
'limit'.freeze
- TRIGGER_TYPE =
'trigger'.freeze
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#packet ⇒ Object
readonly
Returns the value of attribute packet.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#target ⇒ Object
readonly
Returns the value of attribute target.
Instance Method Summary collapse
-
#evaluate(left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger -1 (the value is considered an error used to disable the trigger) 0 (the value is considered as a false value) 1 (the value is considered as a true value).
-
#evaluate_data_packet(topic:, triggers:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
-
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing…
-
#evaluate_wrapper(topic:) ⇒ Object
time how long each packet takes to eval and produce a metric to public.
-
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN.
-
#get_packet_value(operand:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted.
-
#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
constructor
A new instance of TriggerGroupWorker.
-
#operand_value(operand:, other:, visited:) ⇒ Object
extract the value of the operand from the packet.
- #run ⇒ Object
Constructor Details
#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
Returns a new instance of TriggerGroupWorker.
261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 261 def initialize(name:, logger:, scope:, group:, queue:, share:, ident:) @name = name @logger = logger @scope = scope @group = group @queue = queue @share = share @ident = ident @metric = Metric.new(microservice: @name, scope: @scope) @metric_output_time = 0 end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
259 260 261 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259 def group @group end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
259 260 261 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259 def name @name end |
#packet ⇒ Object (readonly)
Returns the value of attribute packet.
259 260 261 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259 def packet @packet end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
259 260 261 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259 def scope @scope end |
#target ⇒ Object (readonly)
Returns the value of attribute target.
259 260 261 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259 def target @target end |
Instance Method Details
#evaluate(left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 370 def evaluate(left:, operator:, right:) @logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left} #{operator} #{right})" begin case operator when '>' return left > right ? 1 : 0 when '<' return left < right ? 1 : 0 when '>=' return left >= right ? 1 : 0 when '<=' return left <= right ? 1 : 0 when '!=' return left != right ? 1 : 0 when '==' return left == right ? 1 : 0 when 'AND' return left && right ? 1 : 0 when 'OR' return left || right ? 1 : 0 end rescue ArgumentError @logger.error "invalid evaluate: (#{left} #{operator} #{right})" return -1 end end |
#evaluate_data_packet(topic:, triggers:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 303 def evaluate_data_packet(topic:, triggers:) visited = Hash.new @logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}" triggers_to_eval = @share.trigger_base.get_triggers(topic: topic) @logger.debug "TriggerGroupWorker-#{@ident} triggers_to_eval: #{triggers_to_eval}" triggers_to_eval.each do | trigger | @logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}" value = evaluate_trigger( head: trigger, trigger: trigger, visited: visited, triggers: triggers ) @logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}" # value MUST be -1, 0, or 1 @share.trigger_base.update_state(name: trigger.name, value: value) end end |
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the vistied Hash and eval_trigger will return a number.
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 409 def evaluate_trigger(head:, trigger:, visited:, triggers:) if visited["#{trigger.name}__R"] return visited["#{trigger.name}__R"] end if visited["#{trigger.name}__P"].nil? visited["#{trigger.name}__P"] = Hash.new end if visited["#{head.name}__P"][trigger.name] # Not sure if this is posible as on create it validates that the dependents are already created @logger.error "loop detected from #{head} -> #{trigger} path: #{visited["#{head.name}__P"]}" return visited["#{trigger.name}__R"] = -1 end trigger.roots.each do | root_trigger_name | next if visited["#{root_trigger_name}__R"] root_trigger = triggers[root_trigger_name] if head.name == root_trigger.name @logger.error "loop detected from #{head} -> #{root_trigger} path: #{visited["#{head.name}__P"]}" return visited["#{trigger.name}__R"] = -1 end result = evaluate_trigger( head: head, trigger: root_trigger, visited: visited, triggers: triggers ) @logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}" visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result end left = operand_value(operand: trigger.left, other: trigger.right, visited: visited) right = operand_value(operand: trigger.right, other: trigger.left, visited: visited) if left.nil? || right.nil? return visited["#{trigger.name}__R"] = 0 end result = evaluate(left: left, operator: trigger.operator, right: right) return visited["#{trigger.name}__R"] = result end |
#evaluate_wrapper(topic:) ⇒ Object
time how long each packet takes to eval and produce a metric to public
293 294 295 296 297 298 299 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 293 def evaluate_wrapper(topic:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) evaluate_data_packet(topic: topic, triggers: @share.trigger_base.triggers) diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'trigger_group' => @group, 'thread' => "worker-#{@ident}" } @metric.add_sample(name: TRIGGER_METRIC_NAME, value: diff, labels: metric_labels) end |
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN
326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 326 def get_packet_limit(operand:, other:) packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) return nil if packet.nil? limit = packet["#{operand[ITEM_TYPE]}__L"] if limit.nil? == false && limit.include?('_') return other[LIMIT_TYPE] if limit.include?(other[LIMIT_TYPE]) end return limit end |
#get_packet_value(operand:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted
341 342 343 344 345 346 347 348 349 350 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 341 def get_packet_value(operand:) packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) return nil if packet.nil? value_type = operand[ITEM_RAW] ? '' : '__C' return packet["#{operand[ITEM_TYPE]}#{value_type}"] end |
#operand_value(operand:, other:, visited:) ⇒ Object
extract the value of the operand from the packet
353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 353 def operand_value(operand:, other:, visited:) if operand[TYPE] == ITEM_TYPE && other[TYPE] == LIMIT_TYPE return get_packet_limit(operand: operand, other: other) elsif operand[TYPE] == ITEM_TYPE return get_packet_value(operand: operand) elsif operand[TYPE] == TRIGGER_TYPE return visited["#{operand[TRIGGER_TYPE]}__R"] == 1 else return operand[operand[TYPE]] end end |
#run ⇒ Object
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 273 def run @logger.info "TriggerGroupWorker-#{@ident} running" loop do topic = @queue.pop break if topic.nil? begin evaluate_wrapper(topic: topic) current_time = Time.now.to_i if @metric_output_time < current_time @metric.output @metric_output_time = current_time + 120 end rescue StandardError => e @logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}" end end @logger.info "TriggerGroupWorker-#{@ident} exiting" end |