Class: OpenC3::TriggerGroupWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/microservices/trigger_group_microservice.rb

Overview

The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers in the kit and evaluate triggers for that packet.

Constant Summary collapse

TYPE =
'type'.freeze
ITEM_RAW =
'raw'.freeze
ITEM_TARGET =
'target'.freeze
ITEM_PACKET =
'packet'.freeze
ITEM_TYPE =
'item'.freeze
FLOAT_TYPE =
'float'.freeze
STRING_TYPE =
'string'.freeze
LIMIT_TYPE =
'limit'.freeze
TRIGGER_TYPE =
'trigger'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, logger:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker

Returns a new instance of TriggerGroupWorker.



259
260
261
262
263
264
265
266
267
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 259

def initialize(name:, logger:, scope:, group:, queue:, share:, ident:)
  @name = name
  @logger = logger
  @scope = scope
  @group = group
  @queue = queue
  @share = share
  @ident = ident
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



257
258
259
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 257

def group
  @group
end

#nameObject (readonly)

Returns the value of attribute name.



257
258
259
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 257

def name
  @name
end

#packetObject (readonly)

Returns the value of attribute packet.



257
258
259
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 257

def packet
  @packet
end

#scopeObject (readonly)

Returns the value of attribute scope.



257
258
259
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 257

def scope
  @scope
end

#targetObject (readonly)

Returns the value of attribute target.



257
258
259
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 257

def target
  @target
end

Instance Method Details

#evaluate(left:, operator:, right:) ⇒ Object

the base evaluate method used by evaluate_trigger

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)


356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 356

def evaluate(left:, operator:, right:)
  @logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left} #{operator} #{right})"
  begin
    case operator
    when '>'
      return left > right ? 1 : 0
    when '<'
      return left < right ? 1 : 0
    when '>='
      return left >= right ? 1 : 0
    when '<='
      return left <= right ? 1 : 0
    when '!='
      return left != right ? 1 : 0
    when '=='
      return left == right ? 1 : 0
    when 'AND'
      return left && right ? 1 : 0
    when 'OR'
      return left || right ? 1 : 0
    end
  rescue ArgumentError
    @logger.error "invalid evaluate: (#{left} #{operator} #{right})"
    return -1
  end
end

#evaluate_data_packet(topic:, triggers:) ⇒ Object

Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 289

def evaluate_data_packet(topic:, triggers:)
  visited = Hash.new
  @logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}"
  triggers_to_eval = @share.trigger_base.get_triggers(topic: topic)
  @logger.debug "TriggerGroupWorker-#{@ident} triggers_to_eval: #{triggers_to_eval}"
  triggers_to_eval.each do | trigger |
    @logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}"
    value = evaluate_trigger(
      head: trigger,
      trigger: trigger,
      visited: visited,
      triggers: triggers
    )
    @logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}"
    # value MUST be -1, 0, or 1
    @share.trigger_base.update_state(name: trigger.name, value: value)
  end
end

#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object

This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the vistied Hash and eval_trigger will return a number.

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)

IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 395

def evaluate_trigger(head:, trigger:, visited:, triggers:)
  if visited["#{trigger.name}__R"]
    return visited["#{trigger.name}__R"]
  end
  if visited["#{trigger.name}__P"].nil?
    visited["#{trigger.name}__P"] = Hash.new
  end
  if visited["#{head.name}__P"][trigger.name]
    # Not sure if this is posible as on create it validates that the dependents are already created
    @logger.error "loop detected from #{head} -> #{trigger} path: #{visited["#{head.name}__P"]}"
    return visited["#{trigger.name}__R"] = -1
  end
  trigger.roots.each do | root_trigger_name |
    next if visited["#{root_trigger_name}__R"]
    root_trigger = triggers[root_trigger_name]
    if head.name == root_trigger.name
      @logger.error "loop detected from #{head} -> #{root_trigger} path: #{visited["#{head.name}__P"]}"
      return visited["#{trigger.name}__R"] = -1
    end
    result = evaluate_trigger(
      head: head,
      trigger: root_trigger,
      visited: visited,
      triggers: triggers
    )
    @logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}"
    visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result
  end
  left = operand_value(operand: trigger.left, other: trigger.right, visited: visited)
  right = operand_value(operand: trigger.right, other: trigger.left, visited: visited)
  if left.nil? || right.nil?
    return visited["#{trigger.name}__R"] = 0
  end
  result = evaluate(left: left, operator: trigger.operator, right: right)
  return visited["#{trigger.name}__R"] = result
end

#evaluate_wrapper(topic:) ⇒ Object



283
284
285
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 283

def evaluate_wrapper(topic:)
  evaluate_data_packet(topic: topic, triggers: @share.trigger_base.triggers)
end

#get_packet_limit(operand:, other:) ⇒ Object

extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN



312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 312

def get_packet_limit(operand:, other:)
  packet = @share.packet_base.packet(
    target: operand[ITEM_TARGET],
    packet: operand[ITEM_PACKET]
  )
  return nil if packet.nil?
  limit = packet["#{operand[ITEM_TYPE]}__L"]
  if limit.nil? == false && limit.include?('_')
    return other[LIMIT_TYPE] if limit.include?(other[LIMIT_TYPE])
  end
  return limit
end

#get_packet_value(operand:) ⇒ Object

extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted



327
328
329
330
331
332
333
334
335
336
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 327

def get_packet_value(operand:)
  packet = @share.packet_base.packet(
    target: operand[ITEM_TARGET],
    packet: operand[ITEM_PACKET]
  )
  return nil if packet.nil?

  value_type = operand[ITEM_RAW] ? '' : '__C'
  return packet["#{operand[ITEM_TYPE]}#{value_type}"]
end

#operand_value(operand:, other:, visited:) ⇒ Object

extract the value of the operand from the packet



339
340
341
342
343
344
345
346
347
348
349
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 339

def operand_value(operand:, other:, visited:)
  if operand[TYPE] == ITEM_TYPE && other[TYPE] == LIMIT_TYPE
    return get_packet_limit(operand: operand, other: other)
  elsif operand[TYPE] == ITEM_TYPE
    return get_packet_value(operand: operand)
  elsif operand[TYPE] == TRIGGER_TYPE
    return visited["#{operand[TRIGGER_TYPE]}__R"] == 1
  else
    return operand[operand[TYPE]]
  end
end

#runObject



269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/openc3/microservices/trigger_group_microservice.rb', line 269

def run
  @logger.info "TriggerGroupWorker-#{@ident} running"
  loop do
    topic = @queue.pop
    break if topic.nil?
    begin
      evaluate_wrapper(topic: topic)
    rescue StandardError => e
      @logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}"
    end
  end
  @logger.info "TriggerGroupWorker-#{@ident} exiting"
end