Class: Roby::Interface::V2::Server
- Defined in:
- lib/roby/interface/v2/server.rb
Overview
The server-side object allowing to access an interface (e.g. a Roby app) through any communication channel
Instance Attribute Summary collapse
-
#client_id ⇒ String
readonly
A string that allows the user to identify the client.
-
#interface ⇒ Interface
readonly
The interface object we are giving access to.
-
#io ⇒ Channel
readonly
The IO to the client.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #disable_notifications ⇒ Object
- #enable_notifications ⇒ Object
-
#flush_pending_packets ⇒ Object
Flush packets queued from #queue_packet.
- #handshake(id, commands) ⇒ Object
- #has_deferred_exception? ⇒ Boolean
-
#initialize(io, interface, main_thread: Thread.current) ⇒ Server
constructor
A new instance of Server.
-
#listen_to_notifications ⇒ Object
Listen to notifications on the underlying interface.
-
#notifications_enabled? ⇒ Boolean
Whether the messages should be forwarded to our clients.
-
#performed_handshake? ⇒ Boolean
Whether the remote side already called #handshake.
-
#poll ⇒ Object
Process one command from the client, and send the reply.
- #process_batch(path, calls) ⇒ Object
- #process_call(path, name, args, keywords) ⇒ Object
- #process_interface_call(path, name, args, keywords) ⇒ Object
-
#queue_packet(call) ⇒ Object
Write or queue a call, depending on whether the current thread is the main thread.
- #to_io ⇒ Object
- #write_packet(call, defer_exceptions: false) ⇒ Object
Constructor Details
#initialize(io, interface, main_thread: Thread.current) ⇒ Server
Returns a new instance of Server.
23 24 25 26 27 28 29 30 |
# File 'lib/roby/interface/v2/server.rb', line 23 def initialize(io, interface, main_thread: Thread.current) @notifications_enabled = true @io = io @interface = interface @main_thread = main_thread @pending_packets = Queue.new @performed_handshake = false end |
Instance Attribute Details
#client_id ⇒ String (readonly)
Returns a string that allows the user to identify the client.
14 15 16 |
# File 'lib/roby/interface/v2/server.rb', line 14 def client_id @client_id end |
#interface ⇒ Interface (readonly)
Returns the interface object we are giving access to.
12 13 14 |
# File 'lib/roby/interface/v2/server.rb', line 12 def interface @interface end |
#io ⇒ Channel (readonly)
Returns the IO to the client.
10 11 12 |
# File 'lib/roby/interface/v2/server.rb', line 10 def io @io end |
Instance Method Details
#close ⇒ Object
118 119 120 121 |
# File 'lib/roby/interface/v2/server.rb', line 118 def close io.close @listeners&.dispose end |
#closed? ⇒ Boolean
114 115 116 |
# File 'lib/roby/interface/v2/server.rb', line 114 def closed? io.closed? end |
#disable_notifications ⇒ Object
110 111 112 |
# File 'lib/roby/interface/v2/server.rb', line 110 def disable_notifications self.notifications_enabled = false end |
#enable_notifications ⇒ Object
106 107 108 |
# File 'lib/roby/interface/v2/server.rb', line 106 def enable_notifications self.notifications_enabled = true end |
#flush_pending_packets ⇒ Object
Flush packets queued from #queue_packet
76 77 78 79 80 81 82 83 84 |
# File 'lib/roby/interface/v2/server.rb', line 76 def flush_pending_packets packets = [] until @pending_packets.empty? packets << @pending_packets.pop end packets.each do |p| write_packet(p, defer_exceptions: true) end end |
#handshake(id, commands) ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/roby/interface/v2/server.rb', line 90 def handshake(id, commands) @client_id = id Roby::Interface.info "new interface client: #{id}" result = commands.each_with_object({}) do |s, result| result[s] = interface.send(s) end @performed_handshake = true listen_to_notifications result end |
#has_deferred_exception? ⇒ Boolean
144 145 146 |
# File 'lib/roby/interface/v2/server.rb', line 144 def has_deferred_exception? @deferred_exception end |
#listen_to_notifications ⇒ Object
Listen to notifications on the underlying interface
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/roby/interface/v2/server.rb', line 33 def listen_to_notifications listeners = [] listeners << @interface.on_cycle_end do write_packet( [ :cycle_end, [@interface.execution_engine.cycle_index, @interface.execution_engine.cycle_start] ], defer_exceptions: true) end listeners << @interface.on_notification do |*args| if notifications_enabled? queue_packet([:notification, args]) elsif Thread.current == @main_thread flush_pending_packets end end listeners << @interface.on_ui_event do |*args| queue_packet([:ui_event, args]) end listeners << @interface.on_job_notification do |*args| write_packet([:job_progress, args], defer_exceptions: true) end listeners << @interface.on_exception do |*args| write_packet([:exception, args], defer_exceptions: true) end @listeners = Roby.disposable(*listeners) end |
#notifications_enabled? ⇒ Boolean
Returns whether the messages should be forwarded to our clients.
18 |
# File 'lib/roby/interface/v2/server.rb', line 18 attr_predicate :notifications_enabled?, true |
#performed_handshake? ⇒ Boolean
Whether the remote side already called #handshake
102 103 104 |
# File 'lib/roby/interface/v2/server.rb', line 102 def performed_handshake? @performed_handshake end |
#poll ⇒ Object
Process one command from the client, and send the reply
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/roby/interface/v2/server.rb', line 162 def poll raise @deferred_exception if has_deferred_exception? path, m, args, keywords = io.read_packet return unless m begin reply = if m == :process_batch process_batch(path, args.first) else process_call(path, m, args, keywords) end true rescue Exception => e write_packet([:bad_call, e]) return end begin write_packet([:reply, reply]) rescue ComError raise rescue Exception => e write_packet([:protocol_error, e]) raise end end |
#process_batch(path, calls) ⇒ Object
123 124 125 126 127 |
# File 'lib/roby/interface/v2/server.rb', line 123 def process_batch(path, calls) calls.map do |p, m, a, kw| process_call(path + p, m, a, kw) end end |
#process_call(path, name, args, keywords) ⇒ Object
129 130 131 132 133 134 135 |
# File 'lib/roby/interface/v2/server.rb', line 129 def process_call(path, name, args, keywords) if path.empty? && respond_to?(name) send(name, *args, **keywords) else process_interface_call(path, name, args, keywords) end end |
#process_interface_call(path, name, args, keywords) ⇒ Object
137 138 139 140 141 142 |
# File 'lib/roby/interface/v2/server.rb', line 137 def process_interface_call(path, name, args, keywords) receiver = path.inject(interface) do |obj, subcommand| obj.send(subcommand) end receiver.send(name, *args, **keywords) end |
#queue_packet(call) ⇒ Object
Write or queue a call, depending on whether the current thread is the main thread
Time ordering between out-of-thread and in-thread packets is not guaranteed, so this can only be used in cases where it does not matter.
67 68 69 70 71 72 73 |
# File 'lib/roby/interface/v2/server.rb', line 67 def queue_packet(call) if Thread.current == @main_thread write_packet(call, defer_exceptions: true) else @pending_packets << call end end |
#to_io ⇒ Object
86 87 88 |
# File 'lib/roby/interface/v2/server.rb', line 86 def to_io io.to_io end |
#write_packet(call, defer_exceptions: false) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/roby/interface/v2/server.rb', line 148 def write_packet(call, defer_exceptions: false) return if has_deferred_exception? flush_pending_packets io.write_packet(call) rescue Exception => e if defer_exceptions @deferred_exception = e else raise end end |