Class: Roby::Interface::TCPServer
Overview
An object that publishes a Roby interface using a TCP server
Instance Attribute Summary collapse
-
#clients ⇒ Array<Client>
readonly
Set of currently active clients.
-
#interface ⇒ Interface
readonly
The interface object we give access to.
-
#server ⇒ ::TCPServer
readonly
The TCP server we are accepting from.
Instance Method Summary collapse
-
#close ⇒ Object
Closes this server.
-
#create_server(socket) ⇒ Server
Creates a server object that will manage the replies on a particular TCP socket.
-
#has_client?(client) ⇒ Boolean
Whether the given client is handled by this server.
-
#initialize(app, host: nil, port: Roby::Interface::DEFAULT_PORT) ⇒ TCPServer
constructor
Creates a new interface server on the given port.
-
#ip_address ⇒ String
The address this interface is bound to.
-
#ip_port ⇒ Integer
The port on which this interface runs.
-
#port ⇒ Integer
Returns the port this server is bound to.
-
#process_pending_requests ⇒ Object
Process all incoming connection requests.
- #queue_accept_future ⇒ Object
Constructor Details
#initialize(app, host: nil, port: Roby::Interface::DEFAULT_PORT) ⇒ TCPServer
Creates a new interface server on the given port
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/roby/interface/tcp.rb', line 32 def initialize(app, host: nil, port: Roby::Interface::DEFAULT_PORT) @app = app @interface = Interface.new(app) @server = begin ::TCPServer.new(host, port) rescue TypeError raise Errno::EADDRINUSE, "#{port} already in use" end @clients = Array.new @abort_on_exception = true @accept_executor = Concurrent::CachedThreadPool.new @accept_future = queue_accept_future @propagation_handler_id = interface.execution_engine. add_propagation_handler( description: 'TCPServer#process_pending_requests', on_error: :ignore) do process_pending_requests end @warn_about_disconnection = false end |
Instance Attribute Details
#clients ⇒ Array<Client> (readonly)
Returns set of currently active clients.
12 13 14 |
# File 'lib/roby/interface/tcp.rb', line 12 def clients @clients end |
#interface ⇒ Interface (readonly)
Returns the interface object we give access to.
8 9 10 |
# File 'lib/roby/interface/tcp.rb', line 8 def interface @interface end |
#server ⇒ ::TCPServer (readonly)
Returns the TCP server we are accepting from.
10 11 12 |
# File 'lib/roby/interface/tcp.rb', line 10 def server @server end |
Instance Method Details
#close ⇒ Object
Closes this server
122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/roby/interface/tcp.rb', line 122 def close clients.each do |c| unless c.closed? c.close end end clients.clear if server && !server.closed? server.close end @accept_executor.shutdown interface.execution_engine. remove_propagation_handler(@propagation_handler_id) end |
#create_server(socket) ⇒ Server
Creates a server object that will manage the replies on a particular TCP socket
74 75 76 |
# File 'lib/roby/interface/tcp.rb', line 74 def create_server(socket) Server.new(DRobyChannel.new(socket, false), interface) end |
#has_client?(client) ⇒ Boolean
Whether the given client is handled by this server
138 139 140 |
# File 'lib/roby/interface/tcp.rb', line 138 def has_client?(client) @clients.include?(client) end |
#ip_address ⇒ String
Returns the address this interface is bound to.
20 21 22 |
# File 'lib/roby/interface/tcp.rb', line 20 def ip_address server.local_address.ip_address end |
#ip_port ⇒ Integer
Returns the port on which this interface runs.
25 26 27 |
# File 'lib/roby/interface/tcp.rb', line 25 def ip_port server.local_address.ip_port end |
#port ⇒ Integer
Returns the port this server is bound to
64 65 66 67 68 |
# File 'lib/roby/interface/tcp.rb', line 64 def port Roby.warn_deprecated "Interface::TCPServer#port is deprecated in favor "\ "of #ip_port to match ruby's Addrinfo API" ip_port end |
#process_pending_requests ⇒ Object
Process all incoming connection requests
The new clients are added into the Roby event loop
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/roby/interface/tcp.rb', line 81 def process_pending_requests if @accept_future.rejected? raise @accept_future.reason elsif @accept_future.fulfilled? clients << create_server(@accept_future.value) @accept_future = queue_accept_future end exceptions = [] clients.delete_if do |client| begin client.poll rescue Exception => e client.close if warn_about_disconnection? Roby::Interface.warn "disconnecting from #{client.client_id}" end next(true) if e.kind_of?(ComError) if abort_on_exception? exceptions << e else Roby.log_exception_with_backtrace(e, Roby::Interface, :warn) end true end end raise exceptions.first unless exceptions.empty? rescue Exception => e if abort_on_exception? @app.execution_engine. add_framework_error(e, "Interface::TCPServer") else Roby.log_exception_with_backtrace(e, Roby, :warn) end end |
#queue_accept_future ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/roby/interface/tcp.rb', line 53 def queue_accept_future Concurrent::Future.execute(executor: @accept_executor) do socket = @server.accept socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) socket end end |