Class: OpenC3::QueueProcessor

Inherits:
Object
  • Object
show all
Includes:
Api
Defined in:
lib/openc3/microservices/queue_microservice.rb

Overview

The queue processor runs in a single thread and processes commands via cmd_api.

Constant Summary

Constants included from Api

Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS

Constants included from ApiShared

ApiShared::DEFAULT_TLM_POLLING_RATE

Constants included from Extract

Extract::SCANNING_REGULAR_EXPRESSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Api

#_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_available, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_details, #interface_protocol_cmd, #interface_target_disable, #interface_target_enable, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #map_target_to_router, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_details, #router_protocol_cmd, #router_target_disable, #router_target_enable, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units, #unmap_target_from_interface, #unmap_target_from_router, #update_news, #update_plugin_store

Methods included from CmdLog

#_build_cmd_output_string

Constructor Details

#initialize(name:, state:, logger:, scope:) ⇒ QueueProcessor

Returns a new instance of QueueProcessor.



41
42
43
44
45
46
47
# File 'lib/openc3/microservices/queue_microservice.rb', line 41

def initialize(name:, state:, logger:, scope:)
  @name = name
  @logger = logger
  @scope = scope
  @state = state
  @cancel_thread = false
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



39
40
41
# File 'lib/openc3/microservices/queue_microservice.rb', line 39

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



39
40
41
# File 'lib/openc3/microservices/queue_microservice.rb', line 39

def scope
  @scope
end

#stateObject

Returns the value of attribute state.



38
39
40
# File 'lib/openc3/microservices/queue_microservice.rb', line 38

def state
  @state
end

Instance Method Details

#process_queued_commandsObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/openc3/microservices/queue_microservice.rb', line 60

def process_queued_commands
  while @state == 'RELEASE'
    begin
      _queue_name, command_data, _timestamp = Store.bzpopmin("#{@scope}:#{@name}", timeout: 0.2)
      if command_data
        command = JSON.parse(command_data)
        # It's important to set queue: false here to avoid infinite recursion when
        # OPENC3_DEFAULT_QUEUE is set because commands would be re-queued to the default queue
        # NOTE: cmd() via script rescues hazardous errors and calls prompt_for_hazardous()
        # but we've overridden it to always return true and go straight to cmd_no_hazardous_check()

        # Support both new format (target_name, cmd_name, cmd_params) and legacy format (command string)
        if command['target_name'] && command['cmd_name']
          # New format: use 3-parameter cmd() method
          if command['cmd_params']
            cmd_params = JSON.parse(command['cmd_params'], allow_nan: true, create_additions: true)
          else
            cmd_params = {}
          end
          cmd(command['target_name'], command['cmd_name'], cmd_params, queue: false, scope: @scope)
        elsif command['value']
          # Legacy format: use single string parameter for backwards compatibility
          cmd(command['value'], queue: false, scope: @scope)
        else
          @logger.error "QueueProcessor: Invalid command format, missing required fields"
        end
      end
    rescue StandardError => e
      @logger.error "QueueProcessor failed to process command from queue #{@name}\n#{e.message}"
    end
    break if @cancel_thread
  end
end

#runObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/openc3/microservices/queue_microservice.rb', line 49

def run
  while true
    if @state == 'RELEASE'
      process_queued_commands()
    else
      sleep 0.2
    end
    break if @cancel_thread
  end
end

#shutdownObject



94
95
96
# File 'lib/openc3/microservices/queue_microservice.rb', line 94

def shutdown
  @cancel_thread = true
end