Class: Roby::Interface::V1::TCPServer
- Defined in:
- lib/roby/interface/v1/tcp.rb
Overview
An object that publishes a Roby interface using a TCP server
Instance Attribute Summary collapse
-
#clients ⇒ Array<Client>
readonly
Set of currently active clients.
-
#interface ⇒ Interface
readonly
The interface object we give access to.
-
#server ⇒ ::TCPServer
readonly
The TCP server we are accepting from.
Instance Method Summary collapse
-
#client_count(handshake: true) ⇒ Integer
Number of clients connected to this server.
-
#close ⇒ Object
Closes this server.
-
#create_server(socket) ⇒ Server
Creates a server object that will manage the replies on a particular TCP socket.
-
#has_client?(client) ⇒ Boolean
Whether the given client is handled by this server.
-
#initialize(app, host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ TCPServer
constructor
Creates a new interface server on the given port.
-
#ip_address ⇒ String
The address this interface is bound to.
-
#ip_port ⇒ Integer
The port on which this interface runs.
- #open_tcp_server(host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ Object
-
#port ⇒ Integer
Returns the port this server is bound to.
-
#process_pending_requests ⇒ Object
Process all incoming connection requests.
- #queue_accept_future ⇒ Object
Constructor Details
#initialize(app, host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ TCPServer
Creates a new interface server on the given port
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/roby/interface/v1/tcp.rb', line 39 def initialize(app, host: nil, port: DEFAULT_PORT, server_fd: nil) @app = app @interface = Interface.new(app) @server = open_tcp_server(host: host, port: port, server_fd: server_fd) @clients = [] @abort_on_exception = false @accept_executor = Concurrent::CachedThreadPool.new @accept_future = queue_accept_future @propagation_handler_id = interface .execution_engine .add_propagation_handler( description: "TCPServer#process_pending_requests", on_error: :ignore ) { process_pending_requests } @warn_about_disconnection = false end |
Instance Attribute Details
#clients ⇒ Array<Client> (readonly)
Returns set of currently active clients.
15 16 17 |
# File 'lib/roby/interface/v1/tcp.rb', line 15 def clients @clients end |
#interface ⇒ Interface (readonly)
Returns the interface object we give access to.
11 12 13 |
# File 'lib/roby/interface/v1/tcp.rb', line 11 def interface @interface end |
#server ⇒ ::TCPServer (readonly)
Returns the TCP server we are accepting from.
13 14 15 |
# File 'lib/roby/interface/v1/tcp.rb', line 13 def server @server end |
Instance Method Details
#client_count(handshake: true) ⇒ Integer
Number of clients connected to this server
99 100 101 102 103 |
# File 'lib/roby/interface/v1/tcp.rb', line 99 def client_count(handshake: true) @clients.count do |c| !handshake || c.performed_handshake? end end |
#close ⇒ Object
Closes this server
154 155 156 157 158 159 160 161 162 163 |
# File 'lib/roby/interface/v1/tcp.rb', line 154 def close clients.each do |c| c.close unless c.closed? end clients.clear server.close if server && !server.closed? @accept_executor.shutdown interface.execution_engine .remove_propagation_handler(@propagation_handler_id) end |
#create_server(socket) ⇒ Server
Creates a server object that will manage the replies on a particular TCP socket
90 91 92 |
# File 'lib/roby/interface/v1/tcp.rb', line 90 def create_server(socket) Server.new(DRobyChannel.new(socket, false), interface) end |
#has_client?(client) ⇒ Boolean
Whether the given client is handled by this server
166 167 168 |
# File 'lib/roby/interface/v1/tcp.rb', line 166 def has_client?(client) @clients.include?(client) end |
#ip_address ⇒ String
Returns the address this interface is bound to.
24 25 26 |
# File 'lib/roby/interface/v1/tcp.rb', line 24 def ip_address server.local_address.ip_address end |
#ip_port ⇒ Integer
Returns the port on which this interface runs.
29 30 31 |
# File 'lib/roby/interface/v1/tcp.rb', line 29 def ip_port server.local_address.ip_port end |
#open_tcp_server(host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/roby/interface/v1/tcp.rb', line 58 def open_tcp_server(host: nil, port: DEFAULT_PORT, server_fd: nil) return ::TCPServer.for_fd(server_fd) if server_fd begin ::TCPServer.new(host, port) rescue TypeError raise Errno::EADDRINUSE, "#{port} already in use" end end |
#port ⇒ Integer
Returns the port this server is bound to
78 79 80 81 82 83 84 |
# File 'lib/roby/interface/v1/tcp.rb', line 78 def port Roby.warn_deprecated( "Interface::TCPServer#port is deprecated in favor "\ "of #ip_port to match ruby's Addrinfo API" ) ip_port end |
#process_pending_requests ⇒ Object
Process all incoming connection requests
The new clients are added into the Roby event loop
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/roby/interface/v1/tcp.rb', line 108 def process_pending_requests if @accept_future.rejected? raise @accept_future.reason elsif @accept_future.fulfilled? clients << create_server(@accept_future.value) @accept_future = queue_accept_future end exceptions = [] clients.delete_if do |client| begin client.poll false rescue Exception => e client.close if warn_about_disconnection? Roby::Interface.warn( "disconnecting from #{client.client_id}" ) end next(true) if e.kind_of?(ComError) if abort_on_exception? exceptions << e else Roby.log_exception_with_backtrace( e, Roby::Interface, :warn ) end true end end raise exceptions.first unless exceptions.empty? rescue Exception => e if abort_on_exception? @app.execution_engine .add_framework_error(e, "Interface::TCPServer") else Roby.log_exception_with_backtrace(e, Roby, :warn) end end |
#queue_accept_future ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/roby/interface/v1/tcp.rb', line 67 def queue_accept_future Concurrent::Future.execute(executor: @accept_executor) do socket = @server.accept socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) socket end end |