Class: Roby::Interface::DRobyChannel
- Defined in:
- lib/roby/interface/droby_channel.rb
Overview
A wrapper on top of raw IO that uses droby marshalling to communicate
Instance Attribute Summary collapse
-
#io ⇒ #read_nonblock, #write
readonly
The channel that allows us to communicate to clients.
-
#marshaller ⇒ DRoby::Marshal
readonly
An object used to marshal or unmarshal objects to/from the connection.
-
#max_write_buffer_size ⇒ Object
readonly
The maximum byte count that the channel can hold on the write side until it bails out.
Instance Method Summary collapse
-
#client? ⇒ Boolean
True if the local process is the client or the server.
- #close ⇒ Object
- #closed? ⇒ Boolean
- #eof? ⇒ Boolean
- #flush ⇒ Object
-
#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2) ⇒ DRobyChannel
constructor
A new instance of DRobyChannel.
-
#push_write_data(new_bytes = nil) ⇒ Boolean
Push queued data.
-
#read_packet(timeout = 0) ⇒ Object?
Read one packet from #io and unmarshal it.
-
#read_wait(timeout: nil) ⇒ Boolean
Wait until there is something to read on the channel.
- #reset_thread_guard ⇒ Object
- #to_io ⇒ Object
- #unmarshal_packet(packet) ⇒ Object
- #write_buffer_size ⇒ Object
-
#write_packet(object) ⇒ void
Write one ruby object (usually an array) as a marshalled packet and send it to #io.
Constructor Details
#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2) ⇒ DRobyChannel
Returns a new instance of DRobyChannel.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/roby/interface/droby_channel.rb', line 17 def initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2) @io = io @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @client = client @incoming = if client? WebSocket::Frame::Incoming::Client.new(type: :binary) else WebSocket::Frame::Incoming::Server.new(type: :binary) end @marshaller = marshaller @max_write_buffer_size = max_write_buffer_size @read_buffer = String.new @write_buffer = String.new @write_thread = nil end |
Instance Attribute Details
#io ⇒ #read_nonblock, #write (readonly)
Returns the channel that allows us to communicate to clients.
6 7 8 |
# File 'lib/roby/interface/droby_channel.rb', line 6 def io @io end |
#marshaller ⇒ DRoby::Marshal (readonly)
Returns an object used to marshal or unmarshal objects to/from the connection.
12 13 14 |
# File 'lib/roby/interface/droby_channel.rb', line 12 def marshaller @marshaller end |
#max_write_buffer_size ⇒ Object (readonly)
The maximum byte count that the channel can hold on the write side until it bails out
15 16 17 |
# File 'lib/roby/interface/droby_channel.rb', line 15 def max_write_buffer_size @max_write_buffer_size end |
Instance Method Details
#client? ⇒ Boolean
Returns true if the local process is the client or the server.
9 |
# File 'lib/roby/interface/droby_channel.rb', line 9 attr_predicate :client? |
#close ⇒ Object
43 44 45 |
# File 'lib/roby/interface/droby_channel.rb', line 43 def close io.close end |
#closed? ⇒ Boolean
47 48 49 |
# File 'lib/roby/interface/droby_channel.rb', line 47 def closed? io.closed? end |
#eof? ⇒ Boolean
51 52 53 |
# File 'lib/roby/interface/droby_channel.rb', line 51 def eof? io.eof? end |
#flush ⇒ Object
55 56 57 |
# File 'lib/roby/interface/droby_channel.rb', line 55 def flush io.flush end |
#push_write_data(new_bytes = nil) ⇒ Boolean
Push queued data
The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/roby/interface/droby_channel.rb', line 147 def push_write_data(new_bytes = nil) @write_thread ||= Thread.current if @write_thread != Thread.current raise InternalError, "cross-thread access to droby channel: from #{@write_thread} to #{Thread.current}" end @write_buffer.concat(new_bytes) if new_bytes written_bytes = io.syswrite(@write_buffer) @write_buffer = @write_buffer[written_bytes..-1] !@write_buffer.empty? rescue Errno::EWOULDBLOCK, Errno::EAGAIN if @write_buffer.size > max_write_buffer_size raise ComError, "droby_channel reached an internal buffer size of #{@write_buffer.size}, which is bigger than the limit of #{max_write_buffer_size}, bailing out" end rescue SystemCallError, IOError, EOFError raise ComError, "broken communication channel" rescue RuntimeError => e # Workaround what seems to be a Ruby bug ... if e. =~ /can.t modify frozen IOError/ raise ComError, "broken communication channel" else raise end end |
#read_packet(timeout = 0) ⇒ Object?
Read one packet from #io and unmarshal it
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/roby/interface/droby_channel.rb', line 73 def read_packet(timeout = 0) @read_thread ||= Thread.current if @read_thread != Thread.current raise InternalError, "cross-thread access to droby channel: from #{@read_thread} to #{Thread.current}" end deadline = Time.now + timeout if timeout remaining_time = timeout if packet = @incoming.next return unmarshal_packet(packet) end while true if IO.select([io], [], [], remaining_time) begin if io.sysread(1024 ** 2, @read_buffer) @incoming << @read_buffer end rescue Errno::EWOULDBLOCK, Errno::EAGAIN end end if packet = @incoming.next return unmarshal_packet(packet) end if deadline remaining_time = deadline - Time.now return if remaining_time < 0 end end rescue SystemCallError, EOFError, IOError raise ComError, "closed communication" end |
#read_wait(timeout: nil) ⇒ Boolean
Wait until there is something to read on the channel
65 66 67 |
# File 'lib/roby/interface/droby_channel.rb', line 65 def read_wait(timeout: nil) !!IO.select([io], [], [], timeout) end |
#reset_thread_guard ⇒ Object
135 136 137 138 |
# File 'lib/roby/interface/droby_channel.rb', line 135 def reset_thread_guard @write_thread = nil @read_thread = nil end |
#to_io ⇒ Object
39 40 41 |
# File 'lib/roby/interface/droby_channel.rb', line 39 def to_io io.to_io end |
#unmarshal_packet(packet) ⇒ Object
110 111 112 113 114 115 116 |
# File 'lib/roby/interface/droby_channel.rb', line 110 def unmarshal_packet(packet) unmarshalled = begin Marshal.load(packet.to_s) rescue TypeError => e raise ProtocolError, "failed to unmarshal received packet: #{e.message}" end marshaller.local_object(unmarshalled) end |
#write_buffer_size ⇒ Object
35 36 37 |
# File 'lib/roby/interface/droby_channel.rb', line 35 def write_buffer_size @write_buffer.size end |
#write_packet(object) ⇒ void
This method returns an undefined value.
Write one ruby object (usually an array) as a marshalled packet and send it to #io
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/roby/interface/droby_channel.rb', line 123 def write_packet(object) marshalled = Marshal.dump(marshaller.dump(object)) packet = if client? WebSocket::Frame::Outgoing::Client.new(data: marshalled, type: :binary) else WebSocket::Frame::Outgoing::Server.new(data: marshalled, type: :binary) end push_write_data(packet.to_s) end |