Class: Roby::Interface::V1::DRobyChannel
- Defined in:
- lib/roby/interface/v1/droby_channel.rb
Overview
A wrapper on top of raw IO that uses droby marshalling to communicate
Constant Summary collapse
- WEBSOCKET_CLASSES =
This is a workaround for a very bad performance behavior on first load. These classes are auto-loaded and it takes forever to load them in multithreaded contexts.
[ WebSocket::Frame::Outgoing::Client, WebSocket::Frame::Outgoing::Server, WebSocket::Frame::Incoming::Client, WebSocket::Frame::Incoming::Server ].freeze
Instance Attribute Summary collapse
-
#io ⇒ #read_nonblock, #write
readonly
The channel that allows us to communicate to clients.
-
#marshaller ⇒ DRoby::Marshal
readonly
An object used to marshal or unmarshal objects to/from the connection.
-
#max_write_buffer_size ⇒ Object
readonly
The maximum byte count that the channel can hold on the write side until it bails out.
Instance Method Summary collapse
-
#client? ⇒ Boolean
True if the local process is the client or the server.
- #close ⇒ Object
- #closed? ⇒ Boolean
- #eof? ⇒ Boolean
- #flush ⇒ Object
-
#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25 * 1024**2) ⇒ DRobyChannel
constructor
A new instance of DRobyChannel.
-
#push_write_data(new_bytes = nil) ⇒ Boolean
Push queued data.
-
#read_packet(timeout = 0) ⇒ Object?
Read one packet from #io and unmarshal it.
-
#read_wait(timeout: nil) ⇒ Boolean
Wait until there is something to read on the channel.
- #reset_thread_guard(read_thread = nil, write_thread = nil) ⇒ Object
- #to_io ⇒ Object
- #unmarshal_packet(packet) ⇒ Object
- #write_buffer_size ⇒ Object
-
#write_packet(object) ⇒ void
Write one ruby object (usually an array) as a marshalled packet and send it to #io.
Constructor Details
#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25 * 1024**2) ⇒ DRobyChannel
Returns a new instance of DRobyChannel.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 32 def initialize( io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25 * 1024**2 ) @io = io @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @client = client @websocket_packet = if client WebSocket::Frame::Outgoing::Client else WebSocket::Frame::Outgoing::Server end @incoming = if client? WebSocket::Frame::Incoming::Client.new(type: :binary) else WebSocket::Frame::Incoming::Server.new(type: :binary) end @marshaller = marshaller @max_write_buffer_size = max_write_buffer_size @read_buffer = String.new @write_buffer = String.new @write_thread = nil end |
Instance Attribute Details
#io ⇒ #read_nonblock, #write (readonly)
Returns the channel that allows us to communicate to clients.
10 11 12 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 10 def io @io end |
#marshaller ⇒ DRoby::Marshal (readonly)
Returns an object used to marshal or unmarshal objects to/from the connection.
17 18 19 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 17 def marshaller @marshaller end |
#max_write_buffer_size ⇒ Object (readonly)
The maximum byte count that the channel can hold on the write side until it bails out
20 21 22 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 20 def max_write_buffer_size @max_write_buffer_size end |
Instance Method Details
#client? ⇒ Boolean
Returns true if the local process is the client or the server.
14 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 14 attr_predicate :client? |
#close ⇒ Object
68 69 70 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 68 def close io.close end |
#closed? ⇒ Boolean
72 73 74 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 72 def closed? io.closed? end |
#eof? ⇒ Boolean
76 77 78 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 76 def eof? io.eof? end |
#flush ⇒ Object
80 81 82 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 80 def flush io.flush end |
#push_write_data(new_bytes = nil) ⇒ Boolean
Push queued data
The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 171 def push_write_data(new_bytes = nil) @write_thread ||= Thread.current if @write_thread != Thread.current raise InternalError, "cross-thread access to droby channel: "\ "from #{@write_thread} to #{Thread.current}" end @write_buffer.concat(new_bytes) if new_bytes written_bytes = io.syswrite(@write_buffer) @write_buffer = @write_buffer[written_bytes..-1] !@write_buffer.empty? rescue Errno::EWOULDBLOCK, Errno::EAGAIN if @write_buffer.size > max_write_buffer_size raise ComError, "droby_channel reached an internal buffer size of "\ "#{@write_buffer.size}, which is bigger than the limit "\ "of #{max_write_buffer_size}, bailing out" end rescue SystemCallError, IOError raise ComError, "broken communication channel" rescue RuntimeError => e # Workaround what seems to be a Ruby bug ... if e. =~ /can.t modify frozen IOError/ raise ComError, "broken communication channel" else raise end end |
#read_packet(timeout = 0) ⇒ Object?
Read one packet from #io and unmarshal it
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 98 def read_packet(timeout = 0) @read_thread ||= Thread.current if @read_thread != Thread.current raise InternalError, "cross-thread access to droby channel: "\ "from #{@read_thread} to #{Thread.current}" end deadline = Time.now + timeout if timeout remaining_time = timeout if packet = @incoming.next return unmarshal_packet(packet) end loop do if IO.select([io], [], [], remaining_time) begin if io.sysread(1024**2, @read_buffer) @incoming << @read_buffer end rescue Errno::EWOULDBLOCK, Errno::EAGAIN end end if packet = @incoming.next return unmarshal_packet(packet) end if deadline remaining_time = deadline - Time.now return if remaining_time < 0 end end rescue SystemCallError, IOError raise ComError, "closed communication" end |
#read_wait(timeout: nil) ⇒ Boolean
Wait until there is something to read on the channel
90 91 92 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 90 def read_wait(timeout: nil) !!IO.select([io], [], [], timeout) end |
#reset_thread_guard(read_thread = nil, write_thread = nil) ⇒ Object
159 160 161 162 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 159 def reset_thread_guard(read_thread = nil, write_thread = nil) @write_thread = read_thread @read_thread = write_thread end |
#to_io ⇒ Object
64 65 66 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 64 def to_io io.to_io end |
#unmarshal_packet(packet) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 136 def unmarshal_packet(packet) unmarshalled = begin Marshal.load(packet.to_s) rescue TypeError => e raise ProtocolError, "failed to unmarshal received packet: #{e.}" end marshaller.local_object(unmarshalled) end |
#write_buffer_size ⇒ Object
60 61 62 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 60 def write_buffer_size @write_buffer.size end |
#write_packet(object) ⇒ void
This method returns an undefined value.
Write one ruby object (usually an array) as a marshalled packet and send it to #io
153 154 155 156 157 |
# File 'lib/roby/interface/v1/droby_channel.rb', line 153 def write_packet(object) marshalled = Marshal.dump(marshaller.dump(object)) packet = @websocket_packet.new(data: marshalled, type: :binary) push_write_data(packet.to_s) end |