Class: OpenC3::MqttInterface
- Defined in:
- lib/openc3/interfaces/mqtt_interface.rb
Overview
Base class for interfaces that send and receive messages over MQTT
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_routers, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #packet_log_writer_pairs, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #scheduler, #secrets, #state, #stored_packet_log_writer_pairs, #stream_log_pair, #target_names, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
-
#connect ⇒ Object
Connects the interface to its target(s).
-
#connected? ⇒ Boolean
Whether the active ports (read and/or write) have created sockets.
-
#disconnect ⇒ Object
Disconnects the interface from its target(s).
-
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
constructor
A new instance of MqttInterface.
- #read ⇒ Object
-
#read_interface ⇒ Object
Reads from the socket if the read_port is defined.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option).
- #write(packet) ⇒ Object
-
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket.
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?
Methods included from Api
#_get_item, #_limits_group, #_validate_tlm_type, #build_cmd_output_string, #build_command, #cmd, #cmd_implementation, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #connect_interface, #connect_router, #delete_config, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_limits, #enable_limits_group, #get_all_command_names, #get_all_commands, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_target_info, #get_all_telemetry, #get_all_telemetry_names, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_command, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_parameter, #get_router, #get_router_names, #get_saved_config, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_telemetry, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_protocol_cmd, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_protocol_cmd, #save_config, #save_setting, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_tlm, #set_tlm_process_args, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_process_args, #tlm_raw, #tlm_variable, #tlm_with_units
Constructor Details
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 95 def initialize(hostname, port = 1883, ssl = false) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @username = nil @password = nil @cert = nil @key = nil @ca_file = nil @write_topics = [] @read_topics = [] # Build list of packets by topic @read_packets_by_topic = {} System.telemetry.all.each do |target_name, target_packets| target_packets.each do |packet_name, packet| topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @read_packets_by_topic[topic] = packet end end end end end |
Instance Method Details
#connect ⇒ Object
Connects the interface to its target(s)
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 125 def connect @write_topics = [] @read_topics = [] @client = MQTT::Client.new @client.host = @hostname @client.port = @port @client.ssl = @ssl @client.username = @username if @username @client.password = @password if @password @client.cert = @cert if @cert @client.key = @key if @key @client.ca_file = @ca_file.path if @ca_file @client.connect @read_packets_by_topic.each do |topic, _| Logger.info "#{@name}: Subscribing to #{topic}" @client.subscribe(topic) end super() end |
#connected? ⇒ Boolean
148 149 150 151 152 153 154 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 148 def connected? if @client return @client.connected? else return false end end |
#disconnect ⇒ Object
Disconnects the interface from its target(s)
157 158 159 160 161 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 157 def disconnect @client.disconnect @client = nil super() end |
#read ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 163 def read topic = @read_topics.shift packet = super() return nil unless packet identified_packet = @read_packets_by_topic[topic] if identified_packet identified_packet = identified_packet.dup identified_packet.buffer = packet.buffer packet = identified_packet end packet.received_time = nil return packet end |
#read_interface ⇒ Object
Reads from the socket if the read_port is defined
191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 191 def read_interface topic, data = @client.get if data.nil? or data.length <= 0 Logger.info "#{@name}: read returned nil" if data.nil? Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end @read_topics << topic extra = nil read_interface_base(data, extra) return data, extra rescue IOError # Disconnected return nil end |
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 222 def set_option(option_name, option_values) super(option_name, option_values) case option_name.upcase when 'USERNAME' @username = option_values[0] when 'PASSWORD' @password = option_values[0] when 'CERT' @cert = option_values[0] when 'KEY' @key = option_values[0] when 'CA_FILE' # CA_FILE must be given as a file @ca_file = Tempfile.new('ca_file') @ca_file.write(option_values[0]) @ca_file.close end end |
#write(packet) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 177 def write(packet) topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @write_topics << topic super(packet) end else raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS" end end |
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket
208 209 210 211 212 213 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 208 def write_interface(data, extra = nil) write_interface_base(data, extra) topic = @write_topics.shift @client.publish(topic, data) return data, extra end |