Class: Roby::Interface::V1::TCPServer

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v1/tcp.rb

Overview

An object that publishes a Roby interface using a TCP server

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ TCPServer

Creates a new interface server on the given port

Parameters:

  • host (String) (defaults to: nil)

    the host to bind the TCP server on

  • port (Integer) (defaults to: DEFAULT_PORT)

    the port to listen on

  • server_fd (Integer, nil) (defaults to: nil)

    a file descriptor for a TCP server that should be used as-is. If set, it supersedes both host and port



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/roby/interface/v1/tcp.rb', line 39

def initialize(app, host: nil, port: DEFAULT_PORT, server_fd: nil)
    @app = app
    @interface = Interface.new(app)
    @server =
        open_tcp_server(host: host, port: port, server_fd: server_fd)
    @clients = []
    @abort_on_exception = false
    @accept_executor = Concurrent::CachedThreadPool.new
    @accept_future = queue_accept_future
    @propagation_handler_id =
        interface
        .execution_engine
        .add_propagation_handler(
            description: "TCPServer#process_pending_requests",
            on_error: :ignore
        ) { process_pending_requests }
    @warn_about_disconnection = false
end

Instance Attribute Details

#clientsArray<Client> (readonly)

Returns set of currently active clients.

Returns:

  • (Array<Client>)

    set of currently active clients



15
16
17
# File 'lib/roby/interface/v1/tcp.rb', line 15

def clients
  @clients
end

#interfaceInterface (readonly)

Returns the interface object we give access to.

Returns:

  • (Interface)

    the interface object we give access to



11
12
13
# File 'lib/roby/interface/v1/tcp.rb', line 11

def interface
  @interface
end

#server::TCPServer (readonly)

Returns the TCP server we are accepting from.

Returns:

  • (::TCPServer)

    the TCP server we are accepting from



13
14
15
# File 'lib/roby/interface/v1/tcp.rb', line 13

def server
  @server
end

Instance Method Details

#client_count(handshake: true) ⇒ Integer

Number of clients connected to this server

Parameters:

  • handshake (Boolean) (defaults to: true)

    if true, return only the clients that have performed their handshake

Returns:

  • (Integer)


99
100
101
102
103
# File 'lib/roby/interface/v1/tcp.rb', line 99

def client_count(handshake: true)
    @clients.count do |c|
        !handshake || c.performed_handshake?
    end
end

#closeObject

Closes this server



154
155
156
157
158
159
160
161
162
163
# File 'lib/roby/interface/v1/tcp.rb', line 154

def close
    clients.each do |c|
        c.close unless c.closed?
    end
    clients.clear
    server.close if server && !server.closed?
    @accept_executor.shutdown
    interface.execution_engine
             .remove_propagation_handler(@propagation_handler_id)
end

#create_server(socket) ⇒ Server

Creates a server object that will manage the replies on a particular TCP socket

Returns:



90
91
92
# File 'lib/roby/interface/v1/tcp.rb', line 90

def create_server(socket)
    Server.new(DRobyChannel.new(socket, false), interface)
end

#has_client?(client) ⇒ Boolean

Whether the given client is handled by this server

Returns:

  • (Boolean)


166
167
168
# File 'lib/roby/interface/v1/tcp.rb', line 166

def has_client?(client)
    @clients.include?(client)
end

#ip_addressString

Returns the address this interface is bound to.

Returns:

  • (String)

    the address this interface is bound to



24
25
26
# File 'lib/roby/interface/v1/tcp.rb', line 24

def ip_address
    server.local_address.ip_address
end

#ip_portInteger

Returns the port on which this interface runs.

Returns:

  • (Integer)

    the port on which this interface runs



29
30
31
# File 'lib/roby/interface/v1/tcp.rb', line 29

def ip_port
    server.local_address.ip_port
end

#open_tcp_server(host: nil, port: DEFAULT_PORT, server_fd: nil) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/roby/interface/v1/tcp.rb', line 58

def open_tcp_server(host: nil, port: DEFAULT_PORT, server_fd: nil)
    return ::TCPServer.for_fd(server_fd) if server_fd

    begin ::TCPServer.new(host, port)
    rescue TypeError
        raise Errno::EADDRINUSE, "#{port} already in use"
    end
end

#portInteger

Returns the port this server is bound to

Returns:

  • (Integer)


78
79
80
81
82
83
84
# File 'lib/roby/interface/v1/tcp.rb', line 78

def port
    Roby.warn_deprecated(
        "Interface::TCPServer#port is deprecated in favor "\
        "of #ip_port to match ruby's Addrinfo API"
    )
    ip_port
end

#process_pending_requestsObject

Process all incoming connection requests

The new clients are added into the Roby event loop



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/roby/interface/v1/tcp.rb', line 108

def process_pending_requests
    if @accept_future.rejected?
        raise @accept_future.reason
    elsif @accept_future.fulfilled?
        clients << create_server(@accept_future.value)
        @accept_future = queue_accept_future
    end

    exceptions = []
    clients.delete_if do |client|
        begin
            client.poll
            false
        rescue Exception => e
            client.close

            if warn_about_disconnection?
                Roby::Interface.warn(
                    "disconnecting from #{client.client_id}"
                )
            end

            next(true) if e.kind_of?(ComError)

            if abort_on_exception?
                exceptions << e
            else
                Roby.log_exception_with_backtrace(
                    e, Roby::Interface, :warn
                )
            end
            true
        end
    end

    raise exceptions.first unless exceptions.empty?
rescue Exception => e
    if abort_on_exception?
        @app.execution_engine
            .add_framework_error(e, "Interface::TCPServer")
    else
        Roby.log_exception_with_backtrace(e, Roby, :warn)
    end
end

#queue_accept_futureObject



67
68
69
70
71
72
73
# File 'lib/roby/interface/v1/tcp.rb', line 67

def queue_accept_future
    Concurrent::Future.execute(executor: @accept_executor) do
        socket = @server.accept
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
        socket
    end
end