Class: OpenC3::RouterTlmHandlerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/microservices/interface_microservice.rb

Instance Method Summary collapse

Constructor Details

#initialize(router, tlm, logger: nil, metric: nil, scope:) ⇒ RouterTlmHandlerThread

Returns a new instance of RouterTlmHandlerThread.



327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/openc3/microservices/interface_microservice.rb', line 327

def initialize(router, tlm, logger: nil, metric: nil, scope:)
  @router = router
  @tlm = tlm
  @scope = scope
  @logger = logger
  @logger = Logger unless @logger
  @metric = metric
  @count = 0
  @directive_count = 0
  @metric.set(name: 'router_directive_total', value: @directive_count, type: 'counter') if @metric
  @metric.set(name: 'router_tlm_total', value: @count, type: 'counter') if @metric
end

Instance Method Details

#graceful_killObject



353
354
355
# File 'lib/openc3/microservices/interface_microservice.rb', line 353

def graceful_kill
  RouterTopic.shutdown(@router, scope: @scope)
end

#runObject



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/openc3/microservices/interface_microservice.rb', line 357

def run
  RouterTopic.receive_telemetry(@router, scope: @scope) do |topic, msg_id, msg_hash, _redis|
    msgid_seconds_from_epoch = msg_id.split('-')[0].to_i / 1000.0
    delta = Time.now.to_f - msgid_seconds_from_epoch
    @metric.set(name: 'router_topic_delta_seconds', value: delta, type: 'gauge', unit: 'seconds', help: 'Delta time between data written to stream and router tlm start') if @metric

    # Check for commands to the router itself
    if /CMD}ROUTER/.match?(topic)
      @directive_count += 1
      @metric.set(name: 'router_directive_total', value: @directive_count, type: 'counter') if @metric

      if msg_hash['shutdown']
        @logger.info "#{@router.name}: Shutdown requested"
        return
      end
      if msg_hash['connect']
        @logger.info "#{@router.name}: Connect requested"
        params = []
        if msg_hash['params']
          params = JSON.parse(msg_hash['params'], :allow_nan => true, :create_additions => true)
        end
        @router = @tlm.attempting(*params)
      end
      if msg_hash['disconnect']
        @logger.info "#{@router.name}: Disconnect requested"
        @tlm.disconnect(false)
      end
      if msg_hash.key?('log_stream')
        if msg_hash['log_stream'] == 'true'
          @logger.info "#{@router.name}: Enable stream logging"
          @router.start_raw_logging
        else
          @logger.info "#{@router.name}: Disable stream logging"
          @router.stop_raw_logging
        end
      end
      if msg_hash.key?('router_cmd')
        params = JSON.parse(msg_hash['router_cmd'], allow_nan: true, create_additions: true)
        begin
          @logger.info "#{@router.name}: router_cmd: #{params['cmd_name']} #{params['cmd_params'].join(' ')}"
          @router.interface_cmd(params['cmd_name'], *params['cmd_params'])
          RouterStatusModel.set(@router.as_json(:allow_nan => true), queued: true, scope: @scope)
        rescue => e
          @logger.error "#{@router.name}: router_cmd: #{e.formatted}"
          next e.message
        end
        next 'SUCCESS'
      end
      if msg_hash.key?('protocol_cmd')
        params = JSON.parse(msg_hash['protocol_cmd'], allow_nan: true, create_additions: true)
        begin
          @logger.info "#{@router.name}: protocol_cmd: #{params['cmd_name']} #{params['cmd_params'].join(' ')} read_write: #{params['read_write']} index: #{params['index']}"
          @router.protocol_cmd(params['cmd_name'], *params['cmd_params'], read_write: params['read_write'], index: params['index'])
          RouterStatusModel.set(@router.as_json(:allow_nan => true), queued: true, scope: @scope)
        rescue => e
          @logger.error "#{@router.name}: protoco_cmd: #{e.formatted}"
          next e.message
        end
        next 'SUCCESS'
      end
      next 'SUCCESS'
    end

    if @router.connected?
      @count += 1
      @metric.set(name: 'router_tlm_total', value: @count, type: 'counter') if @metric

      target_name = msg_hash["target_name"]
      packet_name = msg_hash["packet_name"]

      packet = System.telemetry.packet(target_name, packet_name)
      packet.stored = ConfigParser.handle_true_false(msg_hash["stored"])
      packet.received_time = Time.from_nsec_from_epoch(msg_hash["time"].to_i)
      packet.received_count = msg_hash["received_count"].to_i
      packet.buffer = msg_hash["buffer"]

      begin
        @router.write(packet)
        RouterStatusModel.set(@router.as_json(:allow_nan => true), queued: true, scope: @scope)
        next 'SUCCESS'
      rescue => e
        @logger.error "#{@router.name}: #{e.formatted}"
        next e.message
      end
    end
  end
end

#startObject



340
341
342
343
344
345
346
347
# File 'lib/openc3/microservices/interface_microservice.rb', line 340

def start
  @thread = Thread.new do
    run()
  rescue Exception => e
    @logger.error "#{@router.name}: Telemetry handler thread died: #{e.formatted}"
    raise e
  end
end

#stopObject



349
350
351
# File 'lib/openc3/microservices/interface_microservice.rb', line 349

def stop
  OpenC3.kill_thread(self, @thread)
end