Class: OpenC3::MqttInterface
- Defined in:
- lib/openc3/interfaces/mqtt_interface.rb
Overview
Base class for interfaces that send and receive messages over MQTT
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_routers, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #packet_log_writer_pairs, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #scheduler, #secrets, #state, #stored_packet_log_writer_pairs, #stream_log_pair, #target_names, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
-
#connect ⇒ Object
Connects the interface to its target(s).
-
#connected? ⇒ Boolean
Whether the MQTT client is connected.
- #connection_string ⇒ Object
-
#disconnect ⇒ Object
Disconnects the interface from its target(s).
-
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
constructor
A new instance of MqttInterface.
- #read ⇒ Object
-
#read_interface ⇒ Object
Reads from the client.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option).
- #write(packet) ⇒ Object
-
#write_interface(data, extra = nil) ⇒ Object
Writes to the client.
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #post_connect, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?
Methods included from Api
#_build_cmd_output_string, #_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_protocol_cmd, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_protocol_cmd, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units, #update_news
Constructor Details
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
Returns a new instance of MqttInterface.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 96 def initialize(hostname, port = 1883, ssl = false) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @ack_timeout = 5.0 @username = nil @password = nil @cert = nil @key = nil @ca_file = nil @write_topics = [] @read_topics = [] # Build list of packets by topic @read_packets_by_topic = {} System.telemetry.all.each do |_target_name, target_packets| target_packets.each do |_packet_name, packet| topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @read_packets_by_topic[topic] = packet end end end end end |
Instance Method Details
#connect ⇒ Object
Connects the interface to its target(s)
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 131 def connect @write_topics = [] @read_topics = [] @client = MQTT::Client.new @client.ack_timeout = @ack_timeout @client.host = @hostname @client.port = @port @client.username = @username if @username @client.password = @password if @password @client.ssl = @ssl if @cert and @key @client.ssl = true @client.cert_file = @cert.path @client.key_file = @key.path end if @ca_file @client.ssl = true @client.ca_file = @ca_file.path end @client.connect @read_packets_by_topic.each do |topic, _| Logger.info "#{@name}: Subscribing to #{topic}" @client.subscribe(topic) end super() end |
#connected? ⇒ Boolean
Returns Whether the MQTT client is connected.
159 160 161 162 163 164 165 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 159 def connected? if @client return @client.connected? else return false end end |
#connection_string ⇒ Object
126 127 128 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 126 def connection_string return "#{@hostname}:#{@port} (ssl: #{@ssl})" end |
#disconnect ⇒ Object
Disconnects the interface from its target(s)
168 169 170 171 172 173 174 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 168 def disconnect if @client @client.disconnect @client = nil end super() end |
#read ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 176 def read packet = super() topic = @read_topics.shift return nil unless packet identified_packet = @read_packets_by_topic[topic] if identified_packet identified_packet = identified_packet.dup identified_packet.buffer = packet.buffer packet = identified_packet end packet.received_time = nil return packet end |
#read_interface ⇒ Object
Reads from the client
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 206 def read_interface topic, data = @client.get if data.nil? or data.length <= 0 Logger.info "#{@name}: read returned nil" if data.nil? Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end @read_topics << topic extra = nil read_interface_base(data, extra) return data, extra rescue IOError # Disconnected return nil end |
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 237 def set_option(option_name, option_values) super(option_name, option_values) case option_name.upcase when 'ACK_TIMEOUT' @ack_timeout = Float(option_values[0]) when 'USERNAME' @username = option_values[0] when 'PASSWORD' @password = option_values[0] when 'CERT' # CERT must be given as a file @cert = Tempfile.new('cert') @cert.write(option_values[0]) @cert.close when 'KEY' # KEY must be given as a file @key = Tempfile.new('key') @key.write(option_values[0]) @key.close when 'CA_FILE' # CA_FILE must be given as a file @ca_file = Tempfile.new('ca_file') @ca_file.write(option_values[0]) @ca_file.close end end |
#write(packet) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 190 def write(packet) @write_mutex.synchronize do topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @write_topics << topic super(packet) end else raise "Command packet '#{packet.target_name} #{packet.packet_name}' requires a META TOPIC or TOPICS" end end end |
#write_interface(data, extra = nil) ⇒ Object
Writes to the client
223 224 225 226 227 228 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 223 def write_interface(data, extra = nil) write_interface_base(data, extra) topic = @write_topics.shift @client.publish(topic, data) return data, extra end |