Class: Roby::Interface::TCPServer

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/tcp.rb

Overview

An object that publishes a Roby interface using a TCP server

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, host: nil, port: Roby::Interface::DEFAULT_PORT) ⇒ TCPServer

Creates a new interface server on the given port

Parameters:

  • port (Integer) (defaults to: Roby::Interface::DEFAULT_PORT)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/roby/interface/tcp.rb', line 32

def initialize(app, host: nil, port: Roby::Interface::DEFAULT_PORT)
    @app = app
    @interface = Interface.new(app)
    @server =
        begin ::TCPServer.new(host, port)
        rescue TypeError
            raise Errno::EADDRINUSE, "#{port} already in use"
        end
    @clients = Array.new
    @abort_on_exception = true
    @accept_executor = Concurrent::CachedThreadPool.new
    @accept_future = queue_accept_future
    @propagation_handler_id = interface.execution_engine.
        add_propagation_handler(
            description: 'TCPServer#process_pending_requests',
            on_error: :ignore) do
                process_pending_requests
            end
    @warn_about_disconnection = false
end

Instance Attribute Details

#clientsArray<Client> (readonly)

Returns set of currently active clients.

Returns:

  • (Array<Client>)

    set of currently active clients



12
13
14
# File 'lib/roby/interface/tcp.rb', line 12

def clients
  @clients
end

#interfaceInterface (readonly)

Returns the interface object we give access to.

Returns:

  • (Interface)

    the interface object we give access to



8
9
10
# File 'lib/roby/interface/tcp.rb', line 8

def interface
  @interface
end

#server::TCPServer (readonly)

Returns the TCP server we are accepting from.

Returns:

  • (::TCPServer)

    the TCP server we are accepting from



10
11
12
# File 'lib/roby/interface/tcp.rb', line 10

def server
  @server
end

Instance Method Details

#closeObject

Closes this server



122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/roby/interface/tcp.rb', line 122

def close
    clients.each do |c|
        unless c.closed?
            c.close
        end
    end
    clients.clear
    if server && !server.closed?
        server.close
    end
    @accept_executor.shutdown
    interface.execution_engine.
        remove_propagation_handler(@propagation_handler_id)
end

#create_server(socket) ⇒ Server

Creates a server object that will manage the replies on a particular TCP socket

Returns:



74
75
76
# File 'lib/roby/interface/tcp.rb', line 74

def create_server(socket)
    Server.new(DRobyChannel.new(socket, false), interface)
end

#has_client?(client) ⇒ Boolean

Whether the given client is handled by this server

Returns:

  • (Boolean)


138
139
140
# File 'lib/roby/interface/tcp.rb', line 138

def has_client?(client)
    @clients.include?(client)
end

#ip_addressString

Returns the address this interface is bound to.

Returns:

  • (String)

    the address this interface is bound to



20
21
22
# File 'lib/roby/interface/tcp.rb', line 20

def ip_address
    server.local_address.ip_address
end

#ip_portInteger

Returns the port on which this interface runs.

Returns:

  • (Integer)

    the port on which this interface runs



25
26
27
# File 'lib/roby/interface/tcp.rb', line 25

def ip_port
    server.local_address.ip_port
end

#portInteger

Returns the port this server is bound to

Returns:

  • (Integer)


64
65
66
67
68
# File 'lib/roby/interface/tcp.rb', line 64

def port
    Roby.warn_deprecated "Interface::TCPServer#port is deprecated in favor "\
        "of #ip_port to match ruby's Addrinfo API"
    ip_port
end

#process_pending_requestsObject

Process all incoming connection requests

The new clients are added into the Roby event loop



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/roby/interface/tcp.rb', line 81

def process_pending_requests
    if @accept_future.rejected?
        raise @accept_future.reason
    elsif @accept_future.fulfilled?
        clients << create_server(@accept_future.value)
        @accept_future = queue_accept_future
    end

    exceptions = []
    clients.delete_if do |client|
        begin
            client.poll
        rescue Exception => e
            client.close

            if warn_about_disconnection?
                Roby::Interface.warn "disconnecting from #{client.client_id}"
            end

            next(true) if e.kind_of?(ComError)

            if abort_on_exception?
                exceptions << e
            else
                Roby.log_exception_with_backtrace(e, Roby::Interface, :warn)
            end
            true
        end
    end

    raise exceptions.first unless exceptions.empty?
rescue Exception => e
    if abort_on_exception?
        @app.execution_engine.
            add_framework_error(e, "Interface::TCPServer")
    else
        Roby.log_exception_with_backtrace(e, Roby, :warn)
    end
end

#queue_accept_futureObject



53
54
55
56
57
58
59
# File 'lib/roby/interface/tcp.rb', line 53

def queue_accept_future
    Concurrent::Future.execute(executor: @accept_executor) do
        socket = @server.accept
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
        socket
    end
end