Class: Roby::Interface::V2::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/server.rb

Overview

The server-side object allowing to access an interface (e.g. a Roby app) through any communication channel

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, interface, main_thread: Thread.current) ⇒ Server

Returns a new instance of Server.

Parameters:

  • io (Channel)

    a channel to the server

  • interface (Interface)

    the interface object we give remote access to



23
24
25
26
27
28
29
30
# File 'lib/roby/interface/v2/server.rb', line 23

def initialize(io, interface, main_thread: Thread.current)
    @notifications_enabled = true
    @io = io
    @interface = interface
    @main_thread = main_thread
    @pending_packets = Queue.new
    @performed_handshake = false
end

Instance Attribute Details

#client_idString (readonly)

Returns a string that allows the user to identify the client.

Returns:

  • (String)

    a string that allows the user to identify the client



14
15
16
# File 'lib/roby/interface/v2/server.rb', line 14

def client_id
  @client_id
end

#interfaceInterface (readonly)

Returns the interface object we are giving access to.

Returns:

  • (Interface)

    the interface object we are giving access to



12
13
14
# File 'lib/roby/interface/v2/server.rb', line 12

def interface
  @interface
end

#ioChannel (readonly)

Returns the IO to the client.

Returns:

  • (Channel)

    the IO to the client



10
11
12
# File 'lib/roby/interface/v2/server.rb', line 10

def io
  @io
end

Instance Method Details

#closeObject



118
119
120
121
# File 'lib/roby/interface/v2/server.rb', line 118

def close
    io.close
    @listeners&.dispose
end

#closed?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/roby/interface/v2/server.rb', line 114

def closed?
    io.closed?
end

#disable_notificationsObject



110
111
112
# File 'lib/roby/interface/v2/server.rb', line 110

def disable_notifications
    self.notifications_enabled = false
end

#enable_notificationsObject



106
107
108
# File 'lib/roby/interface/v2/server.rb', line 106

def enable_notifications
    self.notifications_enabled = true
end

#flush_pending_packetsObject

Flush packets queued from #queue_packet



76
77
78
79
80
81
82
83
84
# File 'lib/roby/interface/v2/server.rb', line 76

def flush_pending_packets
    packets = []
    until @pending_packets.empty?
        packets << @pending_packets.pop
    end
    packets.each do |p|
        write_packet(p, defer_exceptions: true)
    end
end

#handshake(id, commands) ⇒ Object



90
91
92
93
94
95
96
97
98
99
# File 'lib/roby/interface/v2/server.rb', line 90

def handshake(id, commands)
    @client_id = id
    Roby::Interface.info "new interface client: #{id}"
    result = commands.each_with_object({}) do |s, result|
        result[s] = interface.send(s)
    end
    @performed_handshake = true
    listen_to_notifications
    result
end

#has_deferred_exception?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/roby/interface/v2/server.rb', line 144

def has_deferred_exception?
    @deferred_exception
end

#listen_to_notificationsObject

Listen to notifications on the underlying interface



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/roby/interface/v2/server.rb', line 33

def listen_to_notifications
    listeners = []
    listeners << @interface.on_cycle_end do
        write_packet(
            [
                :cycle_end,
                [@interface.execution_engine.cycle_index,
                 @interface.execution_engine.cycle_start]
            ], defer_exceptions: true)
    end
    listeners << @interface.on_notification do |*args|
        if notifications_enabled?
            queue_packet([:notification, args])
        elsif Thread.current == @main_thread
            flush_pending_packets
        end
    end
    listeners << @interface.on_ui_event do |*args|
        queue_packet([:ui_event, args])
    end
    listeners << @interface.on_job_notification do |*args|
        write_packet([:job_progress, args], defer_exceptions: true)
    end
    listeners << @interface.on_exception do |*args|
        write_packet([:exception, args], defer_exceptions: true)
    end
    @listeners = Roby.disposable(*listeners)
end

#notifications_enabled?Boolean

Returns whether the messages should be forwarded to our clients.

Returns:

  • (Boolean)

    whether the messages should be forwarded to our clients



18
# File 'lib/roby/interface/v2/server.rb', line 18

attr_predicate :notifications_enabled?, true

#performed_handshake?Boolean

Whether the remote side already called #handshake

Returns:

  • (Boolean)


102
103
104
# File 'lib/roby/interface/v2/server.rb', line 102

def performed_handshake?
    @performed_handshake
end

#pollObject

Process one command from the client, and send the reply

Raises:

  • (@deferred_exception)


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/roby/interface/v2/server.rb', line 162

def poll
    raise @deferred_exception if has_deferred_exception?

    path, m, args, keywords = io.read_packet
    return unless m

    begin
        reply =
            if m == :process_batch
                process_batch(path, args.first)
            else
                process_call(path, m, args, keywords)
            end

        true
    rescue Exception => e
        write_packet([:bad_call, e])
        return
    end

    begin
        write_packet([:reply, reply])
    rescue ComError
        raise
    rescue Exception => e
        write_packet([:protocol_error, e])
        raise
    end
end

#process_batch(path, calls) ⇒ Object



123
124
125
126
127
# File 'lib/roby/interface/v2/server.rb', line 123

def process_batch(path, calls)
    calls.map do |p, m, a, kw|
        process_call(path + p, m, a, kw)
    end
end

#process_call(path, name, args, keywords) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/roby/interface/v2/server.rb', line 129

def process_call(path, name, args, keywords)
    if path.empty? && respond_to?(name)
        send(name, *args, **keywords)
    else
        process_interface_call(path, name, args, keywords)
    end
end

#process_interface_call(path, name, args, keywords) ⇒ Object



137
138
139
140
141
142
# File 'lib/roby/interface/v2/server.rb', line 137

def process_interface_call(path, name, args, keywords)
    receiver = path.inject(interface) do |obj, subcommand|
        obj.send(subcommand)
    end
    receiver.send(name, *args, **keywords)
end

#queue_packet(call) ⇒ Object

Write or queue a call, depending on whether the current thread is the main thread

Time ordering between out-of-thread and in-thread packets is not guaranteed, so this can only be used in cases where it does not matter.



67
68
69
70
71
72
73
# File 'lib/roby/interface/v2/server.rb', line 67

def queue_packet(call)
    if Thread.current == @main_thread
        write_packet(call, defer_exceptions: true)
    else
        @pending_packets << call
    end
end

#to_ioObject



86
87
88
# File 'lib/roby/interface/v2/server.rb', line 86

def to_io
    io.to_io
end

#write_packet(call, defer_exceptions: false) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/roby/interface/v2/server.rb', line 148

def write_packet(call, defer_exceptions: false)
    return if has_deferred_exception?

    flush_pending_packets
    io.write_packet(call)
rescue Exception => e
    if defer_exceptions
        @deferred_exception = e
    else
        raise
    end
end