Class: OpenC3::TcpipServerInterface
- Inherits:
-
StreamInterface
- Object
- Interface
- StreamInterface
- OpenC3::TcpipServerInterface
- Defined in:
- lib/openc3/interfaces/tcpip_server_interface.rb
Overview
TCP/IP Server which can both read and write on a single port or two independent ports. A listen thread is setup which waits for client connections. For each connection to the read port, a thread is spawned that calls the read method from the interface. This data is then available by calling the TcpipServer read method. For each connection to the write port, a thread is spawned that calls the write method from the interface when data is send to the TcpipServer via the write method.
Defined Under Namespace
Classes: InterfaceInfo
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary collapse
-
#listen_address ⇒ String
The ip address to bind to.
-
#read_connection_callback ⇒ Object
Callback method to call when a new client connects to the read port.
-
#stream_log_pair ⇒ StreamLogPair
StreamLogPair instance or nil.
-
#write_connection_callback ⇒ Object
Callback method to call when a new client connects to the write port.
Attributes inherited from StreamInterface
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_target_enabled, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #options, #protocol_info, #read_count, #read_protocols, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #save_raw_data, #scheduler, #secrets, #state, #target_names, #tlm_target_enabled, #tlm_target_names, #write_count, #write_protocols, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
- #change_raw_logging(method) ⇒ Object
- #check_for_dead_clients ⇒ Object
-
#connect ⇒ Object
Create the read and write port listen threads.
-
#connected? ⇒ Boolean
Whether the server is listening for connections.
- #connection_string ⇒ Object
- #details ⇒ Object
-
#disconnect ⇒ Object
Shutdowns the listener threads for both the read and write ports as well as any client connections.
-
#graceful_kill ⇒ Object
Gracefully kill all the threads.
-
#initialize(write_port, read_port, write_timeout, read_timeout, protocol_type = nil, *protocol_args) ⇒ TcpipServerInterface
constructor
A new instance of TcpipServerInterface.
- #listen_thread_body(listen_socket, listen_write, listen_read, thread_reader) ⇒ Object
-
#num_clients ⇒ Integer
The number of connected clients.
-
#read ⇒ Packet
Latest packet read from any of the connected clients.
-
#read_queue_size ⇒ Integer
The number of packets waiting on the read queue.
- #read_thread_body(interface) ⇒ Object
-
#read_thread_hook(packet) ⇒ Packet
Return the packet.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options LISTEN_ADDRESS - Ip address of the interface to accept connections on - Default: 0.0.0.0 (see Interface#set_option).
-
#shutdown_interfaces(interface_infos) ⇒ Object
protected.
- #start_listen_thread(port, listen_write = false, listen_read = false) ⇒ Object
-
#start_raw_logging ⇒ Object
Start raw logging for this interface.
- #start_read_thread(interface_info) ⇒ Object
-
#stop_raw_logging ⇒ Object
Stop raw logging for this interface.
- #write(packet) ⇒ Object
-
#write_queue_size ⇒ Integer
The number of packets waiting on the write queue.
- #write_raw(data) ⇒ Object
- #write_raw_thread_body ⇒ Object
- #write_raw_thread_hook(data) ⇒ Object
- #write_thread_body ⇒ Object
- #write_thread_hook(packet) ⇒ Object
- #write_to_clients(method, packet_or_data) ⇒ Object
Methods inherited from StreamInterface
#read_interface, #write_interface
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #post_connect, #protocol_cmd, #read_allowed?, #read_interface, #read_interface_base, #write_allowed?, #write_interface, #write_interface_base, #write_raw_allowed?
Methods included from Api
#_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_available, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_details, #interface_protocol_cmd, #interface_target_disable, #interface_target_enable, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #map_target_to_router, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_details, #router_protocol_cmd, #router_target_disable, #router_target_enable, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units, #unmap_target_from_interface, #unmap_target_from_router, #update_news, #update_plugin_store
Methods included from CmdLog
Constructor Details
#initialize(write_port, read_port, write_timeout, read_timeout, protocol_type = nil, *protocol_args) ⇒ TcpipServerInterface
Returns a new instance of TcpipServerInterface.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 72 def initialize(write_port, read_port, write_timeout, read_timeout, protocol_type = nil, *protocol_args) super(protocol_type, protocol_args) @write_port = ConfigParser.handle_nil(write_port) @write_port = Integer(write_port) if @write_port @read_port = ConfigParser.handle_nil(read_port) @read_port = Integer(read_port) if @read_port @write_timeout = ConfigParser.handle_nil(write_timeout) @write_timeout = @write_timeout.to_f if @write_timeout @read_timeout = ConfigParser.handle_nil(read_timeout) @read_timeout = @read_timeout.to_f if @read_timeout @listen_sockets = [] @listen_pipes = [] @listen_threads = [] @read_threads = [] @write_thread = nil @write_raw_thread = nil @write_interface_infos = [] @read_interface_infos = [] @write_queue = nil @write_queue = Queue.new if @write_port @write_raw_queue = nil @write_raw_queue = Queue.new if @write_port @read_queue = nil @read_queue = Queue.new if @read_port @write_condition_variable = nil @write_condition_variable = ConditionVariable.new if @write_port @write_raw_mutex = nil @write_raw_mutex = Mutex.new if @write_port @write_raw_condition_variable = nil @write_raw_condition_variable = ConditionVariable.new if @write_port @write_connection_callback = nil @read_connection_callback = nil @stream_log_pair = nil @raw_logging_enabled = false @connection_mutex = Mutex.new @listen_address = "0.0.0.0" @read_allowed = false unless ConfigParser.handle_nil(read_port) @write_allowed = false unless ConfigParser.handle_nil(write_port) @write_raw_allowed = false unless ConfigParser.handle_nil(write_port) @connected = false end |
Instance Attribute Details
#listen_address ⇒ String
Returns The ip address to bind to. Default to ANY (0.0.0.0).
59 60 61 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 59 def listen_address @listen_address end |
#read_connection_callback ⇒ Object
Callback method to call when a new client connects to the read port. This method will be called with the Interface as the only argument.
55 56 57 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 55 def read_connection_callback @read_connection_callback end |
#stream_log_pair ⇒ StreamLogPair
Returns StreamLogPair instance or nil.
57 58 59 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 57 def stream_log_pair @stream_log_pair end |
#write_connection_callback ⇒ Object
Callback method to call when a new client connects to the write port. This method will be called with the Interface as the only argument.
52 53 54 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 52 def write_connection_callback @write_connection_callback end |
Instance Method Details
#change_raw_logging(method) ⇒ Object
312 313 314 315 316 317 318 319 320 321 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 312 def change_raw_logging(method) if @stream_log_pair @write_interface_infos.each do |interface_info| interface_info.interface.stream_log_pair.public_send(method) if interface_info.interface.stream_log_pair end @read_interface_infos.each do |interface_info| interface_info.interface.stream_log_pair.public_send(method) if interface_info.interface.stream_log_pair end end end |
#check_for_dead_clients ⇒ Object
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 548 def check_for_dead_clients indexes_to_delete = [] index = 0 @connection_mutex.synchronize do @write_interface_infos.each do |interface_info| if @write_port != @read_port # Socket should return EWOULDBLOCK if it is still cleanly connected interface_info.interface.stream.write_socket.recvfrom_nonblock(10) elsif !interface_info.interface.stream.write_socket.closed? # Let read thread detect disconnect next end # Client has disconnected (or is invalidly sending data on the socket) Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}" interface_info.interface.disconnect interface_info.interface.stream_log_pair.stop if interface_info.interface.stream_log_pair indexes_to_delete.unshift(index) # Put later indexes at front of array rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError # Client has disconnected Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}" interface_info.interface.disconnect interface_info.interface.stream_log_pair.stop if interface_info.interface.stream_log_pair indexes_to_delete.unshift(index) # Put later indexes at front of array rescue Errno::EWOULDBLOCK # Client is still cleanly connected as far as we can tell without writing to the socket ensure index += 1 end # Delete any dead sockets indexes_to_delete.each do |index_to_delete| @write_interface_infos.delete_at(index_to_delete) end end # connection_mutex.synchronize # Sleep until we receive a packet or for 100ms @write_mutex.synchronize do @write_condition_variable.wait(@write_mutex, 0.1) end end |
#connect ⇒ Object
Create the read and write port listen threads. Incoming connections will spawn separate threads to process the reads and writes.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 133 def connect @cancel_threads = false @read_queue.clear if @read_queue if @write_port == @read_port # One socket start_listen_thread(@read_port, true, true) else start_listen_thread(@write_port, true, false) if @write_port start_listen_thread(@read_port, false, true) if @read_port end if @write_port @write_thread = Thread.new do loop do write_thread_body() break if @cancel_threads end rescue Exception => e shutdown_interfaces(@write_interface_infos) Logger.error("#{@name}: Tcpip server write thread unexpectedly died") Logger.error(e.formatted) end @write_raw_thread = Thread.new do loop do write_raw_thread_body() break if @cancel_threads end rescue Exception => e shutdown_interfaces(@write_interface_infos) Logger.error("#{@name}: Tcpip server write raw thread unexpectedly died") Logger.error(e.formatted) end else @write_thread = nil @write_raw_thread = nil end super() @connected = true end |
#connected? ⇒ Boolean
Returns Whether the server is listening for connections.
173 174 175 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 173 def connected? @connected end |
#connection_string ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 121 def connection_string if @write_port == @read_port return "listening on #{@listen_address}:#{@write_port} (R/W)" end result = "listening on" result += " #{@listen_address}:#{@write_port} (write)" if @write_port result += " #{@listen_address}:#{@read_port} (read)" if @read_port return result end |
#details ⇒ Object
630 631 632 633 634 635 636 637 638 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 630 def details result = super() result['write_port'] = @write_port result['read_port'] = @read_port result['write_timeout'] = @write_timeout result['read_timeout'] = @read_timeout result['listen_address'] = @listen_address return result end |
#disconnect ⇒ Object
Shutdowns the listener threads for both the read and write ports as well as any client connections.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 179 def disconnect @cancel_threads = true @read_queue << nil if @read_queue @listen_pipes.each do |pipe| pipe.write('.') rescue Exception # Oh well end @listen_pipes.clear # Shutdown listen thread(s) @listen_threads.each { |listen_thread| OpenC3.kill_thread(self, listen_thread) } @listen_threads.clear # Shutdown listen socket(s) @listen_sockets.each do |listen_socket| OpenC3.close_socket(listen_socket) rescue IOError # Ok may have been closed by the thread end @listen_sockets.clear # This will unblock read threads shutdown_interfaces(@read_interface_infos) @read_threads.each { |thread| OpenC3.kill_thread(self, thread) } @read_threads.clear if @write_thread OpenC3.kill_thread(self, @write_thread) @write_thread = nil end if @write_raw_thread OpenC3.kill_thread(self, @write_raw_thread) @write_raw_thread = nil end shutdown_interfaces(@write_interface_infos) @connected = false super() end |
#graceful_kill ⇒ Object
Gracefully kill all the threads
221 222 223 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 221 def graceful_kill # This method is just here to prevent warnings end |
#listen_thread_body(listen_socket, listen_write, listen_read, thread_reader) ⇒ Object
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 367 def listen_thread_body(listen_socket, listen_write, listen_read, thread_reader) begin socket, address = listen_socket.accept_nonblock rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK read_ready, _ = IO.select([listen_socket, thread_reader]) if read_ready && read_ready.include?(thread_reader) return else retry end end port, host_ip = Socket.unpack_sockaddr_in(address) hostname = '' hostname = Socket.lookup_hostname_from_ip(host_ip) # if System.instance.acl # addr = ["AF_INET", 10, "lc630", host_ip.to_s] # if not System.instance.acl.allow_addr?(addr) # # Reject connection # OpenC3.close_socket(socket) # Logger.info "#{@name}: Tcpip server rejected connection from #{hostname}(#{host_ip}):#{port}" # return # end # end # Configure TCP_NODELAY option socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # Accept Connection write_socket = nil read_socket = nil write_socket = socket if listen_write read_socket = socket if listen_read stream = TcpipSocketStream.new(write_socket, read_socket, @write_timeout, @read_timeout) # Pass down options to the stream @options.each do |option_name, option_values| stream.set_option(option_name, option_values) end interface = StreamInterface.new interface.target_names = @target_names interface.cmd_target_names = @cmd_target_names interface.tlm_target_names = @tlm_target_names if @stream_log_pair interface.stream_log_pair = @stream_log_pair.clone interface.stream_log_pair.start if @raw_logging_enabled end @protocol_info.each do |protocol_class, protocol_args, read_write| interface.add_protocol(protocol_class, protocol_args, read_write) end interface.stream = stream interface.connect if listen_write @write_connection_callback.call(interface) if @write_connection_callback @connection_mutex.synchronize do @write_interface_infos << InterfaceInfo.new(interface, hostname, host_ip, port) end end if listen_read @read_connection_callback.call(interface) if @read_connection_callback @connection_mutex.synchronize do @read_interface_infos << InterfaceInfo.new(interface, hostname, host_ip, port) end start_read_thread(@read_interface_infos[-1]) end Logger.info "#{@name}: Tcpip server accepted connection from #{hostname}(#{host_ip}):#{port}" end |
#num_clients ⇒ Integer
Returns The number of connected clients.
271 272 273 274 275 276 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 271 def num_clients interfaces = [] @write_interface_infos.each { |wii| interfaces << wii.interface } @read_interface_infos.each { |rii| interfaces << rii.interface } interfaces.uniq.length end |
#read ⇒ Packet
Returns Latest packet read from any of the connected clients. Note this method blocks until data is available.
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 227 def read raise "Interface not connected for read: #{@name}" unless connected? raise "Interface not readable: #{@name}" unless read_allowed? packet = @read_queue.pop return nil unless packet @read_count += 1 packet end |
#read_queue_size ⇒ Integer
Returns The number of packets waiting on the read queue.
261 262 263 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 261 def read_queue_size @read_queue ? @read_queue.size : 0 end |
#read_thread_body(interface) ⇒ Object
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 524 def read_thread_body(interface) thread_bytes_read = 0 loop do packet = interface.read interface_bytes_read = interface.bytes_read if interface_bytes_read != thread_bytes_read diff = interface_bytes_read - thread_bytes_read @bytes_read += diff # This would be better if mutex protected, but not that important for telemetry thread_bytes_read = interface_bytes_read end return if !packet || @cancel_threads packet = read_thread_hook(packet) # Do work on received packet @read_raw_data_time = interface.read_raw_data_time @read_raw_data = interface.read_raw_data @read_queue << packet.clone end end |
#read_thread_hook(packet) ⇒ Packet
Return the packet
544 545 546 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 544 def read_thread_hook(packet) packet end |
#set_option(option_name, option_values) ⇒ Object
Supported Options LISTEN_ADDRESS - Ip address of the interface to accept connections on - Default: 0.0.0.0 (see Interface#set_option)
293 294 295 296 297 298 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 293 def set_option(option_name, option_values) super(option_name, option_values) if option_name.upcase == 'LISTEN_ADDRESS' @listen_address = option_values[0] end end |
#shutdown_interfaces(interface_infos) ⇒ Object
protected
302 303 304 305 306 307 308 309 310 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 302 def shutdown_interfaces(interface_infos) @connection_mutex.synchronize do interface_infos.each do |interface_info| interface_info.interface.disconnect interface_info.interface.stream_log_pair.stop if interface_info.interface.stream_log_pair end interface_infos.clear end end |
#start_listen_thread(port, listen_write = false, listen_read = false) ⇒ Object
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 323 def start_listen_thread(port, listen_write = false, listen_read = false) # Create a socket to accept connections from clients addr = Socket.pack_sockaddr_in(port, @listen_address) if RUBY_ENGINE == 'ruby' listen_socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows? begin listen_socket.bind(addr) rescue Errno::EADDRINUSE raise "Error binding to port #{port}.\n" + "Either another application is using this port\n" + "or the operating system is being slow cleaning up.\n" + "Make sure all sockets/streams are closed in all applications,\n" + "wait 1 minute and try again." end listen_socket.listen(5) else listen_socket = ServerSocket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows? begin listen_socket.bind(addr, 5) rescue Errno::EADDRINUSE raise "Error binding to port #{port}.\n" + "Either another application is using this port\n" + "or the operating system is being slow cleaning up.\n" + "Make sure all sockets/streams are closed in all applications,\n" + "wait 1 minute and try again." end end @listen_sockets << listen_socket @listen_threads << Thread.new do thread_reader, thread_writer = IO.pipe @listen_pipes << thread_writer loop do listen_thread_body(listen_socket, listen_write, listen_read, thread_reader) break if @cancel_threads end rescue => e Logger.error("#{@name}: Tcpip server listen thread unexpectedly died") Logger.error(e.formatted) end end |
#start_raw_logging ⇒ Object
Start raw logging for this interface
279 280 281 282 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 279 def start_raw_logging @raw_logging_enabled = true change_raw_logging(:start) end |
#start_read_thread(interface_info) ⇒ Object
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 437 def start_read_thread(interface_info) @read_threads << Thread.new do index_to_delete = nil begin begin read_thread_body(interface_info.interface) rescue Exception => e Logger.error "#{@name}: Tcpip server read thread unexpectedly died" Logger.error e.formatted end Logger.info "#{@name}: Tcpip server lost read connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}" @read_threads.delete(Thread.current) index_to_delete = nil @connection_mutex.synchronize do index = 0 @read_interface_infos.each do |read_interface_info| if interface_info.interface == read_interface_info.interface index_to_delete = index read_interface_info.interface.disconnect read_interface_info.interface.stream_log_pair.stop if read_interface_info.interface.stream_log_pair break end index += 1 end ensure if index_to_delete @read_interface_infos.delete_at(index_to_delete) end end rescue Exception => e Logger.error "#{@name}: Tcpip server read thread unexpectedly died" Logger.error e.formatted end end end |
#stop_raw_logging ⇒ Object
Stop raw logging for this interface
285 286 287 288 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 285 def stop_raw_logging @raw_logging_enabled = false change_raw_logging(:stop) end |
#write(packet) ⇒ Object
240 241 242 243 244 245 246 247 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 240 def write(packet) raise "Interface not connected for write: #{@name}" unless connected? raise "Interface not writable: #{@name}" unless write_allowed? @write_count += 1 @write_queue << packet.clone @write_condition_variable.broadcast end |
#write_queue_size ⇒ Integer
Returns The number of packets waiting on the write queue.
266 267 268 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 266 def write_queue_size @write_queue ? @write_queue.size : 0 end |
#write_raw(data) ⇒ Object
251 252 253 254 255 256 257 258 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 251 def write_raw(data) raise "Interface not connected for write_raw: #{@name}" unless connected? raise "Interface not write-rawable: #{@name}" unless write_raw_allowed? @write_raw_queue << data @write_raw_condition_variable.broadcast return data end |
#write_raw_thread_body ⇒ Object
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 494 def write_raw_thread_body # Retrieve the next data to be sent out to clients data = nil loop do break if @cancel_threads begin data = @write_raw_queue.pop(true) # non_block to raise ThreadError break rescue ThreadError # Sleep until we receive data or for 100ms @write_raw_mutex.synchronize do @write_raw_condition_variable.wait(@write_raw_mutex, 0.1) end end end data = write_raw_thread_hook(data) write_to_clients(:write_raw, data) if data end |
#write_raw_thread_hook(data) ⇒ Object
520 521 522 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 520 def write_raw_thread_hook(data) data # By default just return the data end |
#write_thread_body ⇒ Object
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 474 def write_thread_body # Retrieve the next packet to be sent out to clients # Handles disconnected clients even when packets aren't flowing packet = nil loop do break if @cancel_threads begin packet = @write_queue.pop(true) # non_block to raise ThreadError break rescue ThreadError check_for_dead_clients() end end packet = write_thread_hook(packet) write_to_clients(:write, packet) if packet end |
#write_thread_hook(packet) ⇒ Object
516 517 518 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 516 def write_thread_hook(packet) packet # By default just return the packet end |
#write_to_clients(method, packet_or_data) ⇒ Object
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 |
# File 'lib/openc3/interfaces/tcpip_server_interface.rb', line 590 def write_to_clients(method, packet_or_data) @connection_mutex.synchronize do # Send data to each client - On error drop the client indexes_to_delete = [] index = 0 @write_interface_infos.each do |interface_info| need_disconnect = false begin interface_bytes_written = interface_info.interface.bytes_written interface_info.interface.public_send(method, packet_or_data) diff = interface_info.interface.bytes_written - interface_bytes_written @written_raw_data_time = interface_info.interface.written_raw_data_time @written_raw_data = interface_info.interface.written_raw_data @bytes_written += diff rescue Errno::EPIPE, Errno::ECONNABORTED, IOError, Errno::ECONNRESET # Client has normally disconnected need_disconnect = true rescue Exception => e if e. != "Stream not connected for write_raw" Logger.error "#{@name}: Error sending to client: #{e.class} #{e.}" end need_disconnect = true end if need_disconnect Logger.info "#{@name}: Tcpip server lost write connection to #{interface_info.hostname}(#{interface_info.host_ip}):#{interface_info.port}" interface_info.interface.disconnect interface_info.interface.stream_log_pair.stop if interface_info.interface.stream_log_pair indexes_to_delete.unshift(index) # Put later indexes at front of array end index += 1 end # Delete any dead sockets indexes_to_delete.each do |index_to_delete| @write_interface_infos.delete_at(index_to_delete) end end # connection_mutex.synchronize end |