Class: OpenC3::QueueProcessor
- Includes:
- Api
- Defined in:
- lib/openc3/microservices/queue_microservice.rb
Overview
The queue processor runs in a single thread and processes commands via cmd_api.
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#state ⇒ Object
Returns the value of attribute state.
Instance Method Summary collapse
-
#initialize(name:, state:, logger:, scope:) ⇒ QueueProcessor
constructor
A new instance of QueueProcessor.
- #process_queued_commands ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
Methods included from Api
#_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_available, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_details, #interface_protocol_cmd, #interface_target_disable, #interface_target_enable, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #map_target_to_router, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_details, #router_protocol_cmd, #router_target_disable, #router_target_enable, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units, #unmap_target_from_interface, #unmap_target_from_router, #update_news, #update_plugin_store
Methods included from CmdLog
Constructor Details
#initialize(name:, state:, logger:, scope:) ⇒ QueueProcessor
Returns a new instance of QueueProcessor.
41 42 43 44 45 46 47 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 41 def initialize(name:, state:, logger:, scope:) @name = name @logger = logger @scope = scope @state = state @cancel_thread = false end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
39 40 41 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 39 def name @name end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
39 40 41 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 39 def scope @scope end |
#state ⇒ Object
Returns the value of attribute state.
38 39 40 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 38 def state @state end |
Instance Method Details
#process_queued_commands ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 60 def process_queued_commands while @state == 'RELEASE' begin _queue_name, command_data, = Store.bzpopmin("#{@scope}:#{@name}", timeout: 0.2) if command_data command = JSON.parse(command_data) # It's important to set queue: false here to avoid infinite recursion when # OPENC3_DEFAULT_QUEUE is set because commands would be re-queued to the default queue # NOTE: cmd() via script rescues hazardous errors and calls prompt_for_hazardous() # but we've overridden it to always return true and go straight to cmd_no_hazardous_check() # Support both new format (target_name, cmd_name, cmd_params) and legacy format (command string) if command['target_name'] && command['cmd_name'] # New format: use 3-parameter cmd() method if command['cmd_params'] cmd_params = JSON.parse(command['cmd_params'], allow_nan: true, create_additions: true) else cmd_params = {} end cmd(command['target_name'], command['cmd_name'], cmd_params, queue: false, scope: @scope) elsif command['value'] # Legacy format: use single string parameter for backwards compatibility cmd(command['value'], queue: false, scope: @scope) else @logger.error "QueueProcessor: Invalid command format, missing required fields" end end rescue StandardError => e @logger.error "QueueProcessor failed to process command from queue #{@name}\n#{e.}" end break if @cancel_thread end end |
#run ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 49 def run while true if @state == 'RELEASE' process_queued_commands() else sleep 0.2 end break if @cancel_thread end end |
#shutdown ⇒ Object
94 95 96 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 94 def shutdown @cancel_thread = true end |