Class: Roby::Interface::V1::DRobyChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v1/droby_channel.rb

Overview

A wrapper on top of raw IO that uses droby marshalling to communicate

Constant Summary collapse

WEBSOCKET_CLASSES =

This is a workaround for a very bad performance behavior on first load. These classes are auto-loaded and it takes forever to load them in multithreaded contexts.

[
    WebSocket::Frame::Outgoing::Client,
    WebSocket::Frame::Outgoing::Server,
    WebSocket::Frame::Incoming::Client,
    WebSocket::Frame::Incoming::Server
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25 * 1024**2) ⇒ DRobyChannel

Returns a new instance of DRobyChannel.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/roby/interface/v1/droby_channel.rb', line 32

def initialize(
    io, client,
    marshaller: DRoby::Marshal.new(auto_create_plans: true),
    max_write_buffer_size: 25 * 1024**2
)
    @io = io
    @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @client = client
    @websocket_packet =
        if client
            WebSocket::Frame::Outgoing::Client
        else
            WebSocket::Frame::Outgoing::Server
        end

    @incoming =
        if client?
            WebSocket::Frame::Incoming::Client.new(type: :binary)
        else
            WebSocket::Frame::Incoming::Server.new(type: :binary)
        end
    @marshaller = marshaller
    @max_write_buffer_size = max_write_buffer_size
    @read_buffer = String.new
    @write_buffer = String.new
    @write_thread = nil
end

Instance Attribute Details

#io#read_nonblock, #write (readonly)

Returns the channel that allows us to communicate to clients.

Returns:

  • (#read_nonblock, #write)

    the channel that allows us to communicate to clients



10
11
12
# File 'lib/roby/interface/v1/droby_channel.rb', line 10

def io
  @io
end

#marshallerDRoby::Marshal (readonly)

Returns an object used to marshal or unmarshal objects to/from the connection.

Returns:

  • (DRoby::Marshal)

    an object used to marshal or unmarshal objects to/from the connection



17
18
19
# File 'lib/roby/interface/v1/droby_channel.rb', line 17

def marshaller
  @marshaller
end

#max_write_buffer_sizeObject (readonly)

The maximum byte count that the channel can hold on the write side until it bails out



20
21
22
# File 'lib/roby/interface/v1/droby_channel.rb', line 20

def max_write_buffer_size
  @max_write_buffer_size
end

Instance Method Details

#client?Boolean

Returns true if the local process is the client or the server.

Returns:

  • (Boolean)

    true if the local process is the client or the server



14
# File 'lib/roby/interface/v1/droby_channel.rb', line 14

attr_predicate :client?

#closeObject



68
69
70
# File 'lib/roby/interface/v1/droby_channel.rb', line 68

def close
    io.close
end

#closed?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/roby/interface/v1/droby_channel.rb', line 72

def closed?
    io.closed?
end

#eof?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/roby/interface/v1/droby_channel.rb', line 76

def eof?
    io.eof?
end

#flushObject



80
81
82
# File 'lib/roby/interface/v1/droby_channel.rb', line 80

def flush
    io.flush
end

#push_write_data(new_bytes = nil) ⇒ Boolean

Push queued data

The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.

Returns:

  • (Boolean)

    true if there is still data left in the buffe, false otherwise



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/roby/interface/v1/droby_channel.rb', line 171

def push_write_data(new_bytes = nil)
    @write_thread ||= Thread.current
    if @write_thread != Thread.current
        raise InternalError,
              "cross-thread access to droby channel: "\
              "from #{@write_thread} to #{Thread.current}"
    end

    @write_buffer.concat(new_bytes) if new_bytes
    written_bytes = io.syswrite(@write_buffer)

    @write_buffer = @write_buffer[written_bytes..-1]
    !@write_buffer.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN
    if @write_buffer.size > max_write_buffer_size
        raise ComError,
              "droby_channel reached an internal buffer size of "\
              "#{@write_buffer.size}, which is bigger than the limit "\
              "of #{max_write_buffer_size}, bailing out"
    end
rescue SystemCallError, IOError
    raise ComError, "broken communication channel"
rescue RuntimeError => e
    # Workaround what seems to be a Ruby bug ...
    if e.message =~ /can.t modify frozen IOError/
        raise ComError, "broken communication channel"
    else
        raise
    end
end

#read_packet(timeout = 0) ⇒ Object?

Read one packet from #io and unmarshal it

Returns:

  • (Object, nil)

    returns the unmarshalled object, or nil if no full object can be found in the data received so far



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/roby/interface/v1/droby_channel.rb', line 98

def read_packet(timeout = 0)
    @read_thread ||= Thread.current
    if @read_thread != Thread.current
        raise InternalError,
              "cross-thread access to droby channel: "\
              "from #{@read_thread} to #{Thread.current}"
    end

    deadline       = Time.now + timeout if timeout
    remaining_time = timeout

    if packet = @incoming.next
        return unmarshal_packet(packet)
    end

    loop do
        if IO.select([io], [], [], remaining_time)
            begin
                if io.sysread(1024**2, @read_buffer)
                    @incoming << @read_buffer
                end
            rescue Errno::EWOULDBLOCK, Errno::EAGAIN
            end
        end

        if packet = @incoming.next
            return unmarshal_packet(packet)
        end

        if deadline
            remaining_time = deadline - Time.now
            return if remaining_time < 0
        end
    end
rescue SystemCallError, IOError
    raise ComError, "closed communication"
end

#read_wait(timeout: nil) ⇒ Boolean

Wait until there is something to read on the channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



90
91
92
# File 'lib/roby/interface/v1/droby_channel.rb', line 90

def read_wait(timeout: nil)
    !!IO.select([io], [], [], timeout)
end

#reset_thread_guard(read_thread = nil, write_thread = nil) ⇒ Object



159
160
161
162
# File 'lib/roby/interface/v1/droby_channel.rb', line 159

def reset_thread_guard(read_thread = nil, write_thread = nil)
    @write_thread = read_thread
    @read_thread = write_thread
end

#to_ioObject



64
65
66
# File 'lib/roby/interface/v1/droby_channel.rb', line 64

def to_io
    io.to_io
end

#unmarshal_packet(packet) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/roby/interface/v1/droby_channel.rb', line 136

def unmarshal_packet(packet)
    unmarshalled =
        begin
            Marshal.load(packet.to_s)
        rescue TypeError => e
            raise ProtocolError,
                  "failed to unmarshal received packet: #{e.message}"
        end

    marshaller.local_object(unmarshalled)
end

#write_buffer_sizeObject



60
61
62
# File 'lib/roby/interface/v1/droby_channel.rb', line 60

def write_buffer_size
    @write_buffer.size
end

#write_packet(object) ⇒ void

This method returns an undefined value.

Write one ruby object (usually an array) as a marshalled packet and send it to #io

Parameters:

  • object (Object)

    the object to be sent



153
154
155
156
157
# File 'lib/roby/interface/v1/droby_channel.rb', line 153

def write_packet(object)
    marshalled = Marshal.dump(marshaller.dump(object))
    packet = @websocket_packet.new(data: marshalled, type: :binary)
    push_write_data(packet.to_s)
end