Class: Roby::Interface::V2::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/channel.rb

Overview

A wrapper on top of raw IO that uses droby marshalling to communicate

Defined Under Namespace

Classes: Stats

Constant Summary collapse

WEBSOCKET_CLASSES =

This is a workaround for a very bad performance behavior on first load. These classes are auto-loaded and it takes forever to load them in multithreaded contexts.

[
    WebSocket::Frame::Outgoing::Client,
    WebSocket::Frame::Outgoing::Server,
    WebSocket::Frame::Incoming::Client,
    WebSocket::Frame::Incoming::Server
].freeze
ALLOWED_BASIC_TYPES =
[
    TrueClass, FalseClass, NilClass, Integer, Float, String, Symbol, Time,
    Range
].freeze
None =
Object.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, client, max_write_buffer_size: 25 * 1024**2) ⇒ Channel

Returns a new instance of Channel.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/roby/interface/v2/channel.rb', line 40

def initialize(
    io, client,
    max_write_buffer_size: 25 * 1024**2
)
    @io = io
    @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @client = client
    @stats = Stats.new(tx: 0, rx: 0)
    @websocket_packet =
        if client
            WebSocket::Frame::Outgoing::Client
        else
            WebSocket::Frame::Outgoing::Server
        end

    @incoming =
        if client?
            WebSocket::Frame::Incoming::Client.new(type: :binary)
        else
            WebSocket::Frame::Incoming::Server.new(type: :binary)
        end
    @max_write_buffer_size = max_write_buffer_size
    @read_buffer = String.new
    @write_buffer = String.new
    @write_thread = nil

    @marshallers = {}
    @allowed_objects = Set.new
    @resolved_marshallers = {}
    Protocol.setup_channel(self)
end

Instance Attribute Details

#io#read_nonblock, #write (readonly)

Returns the channel that allows us to communicate to clients.

Returns:

  • (#read_nonblock, #write)

    the channel that allows us to communicate to clients



10
11
12
# File 'lib/roby/interface/v2/channel.rb', line 10

def io
  @io
end

#max_write_buffer_sizeObject (readonly)

The maximum byte count that the channel can hold on the write side until it bails out



18
19
20
# File 'lib/roby/interface/v2/channel.rb', line 18

def max_write_buffer_size
  @max_write_buffer_size
end

#statsStats (readonly)

Returns I/O statistics.

Returns:

  • (Stats)

    I/O statistics



38
39
40
# File 'lib/roby/interface/v2/channel.rb', line 38

def stats
  @stats
end

Class Method Details

.find_invalid_marshalling_object(object) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/roby/interface/v2/channel.rb', line 186

def self.find_invalid_marshalling_object(object)
    case object
    when Array, Struct, Hash
        object.each do
            obj = find_invalid_marshalling_object(_1)
            return obj if obj
        end
    else
        begin
            ::Marshal.dump(object)
            nil
        rescue TypeError
            object
        end
    end
end

Instance Method Details

#add_marshaller(*classes) {|channel, object| ... } ⇒ Object

Define a custom marshaller for objects of the given class

Parameters:

  • classes (Array<Class>)

    the classes to use the given marshaller for. This will match instances of subclasses as well. The first marshaller defined for a given instance will win.

Yield Parameters:

Yield Returns:

  • (Object)

    the marshalled object



215
216
217
218
# File 'lib/roby/interface/v2/channel.rb', line 215

def add_marshaller(*classes, &block)
    classes.each { @marshallers[_1] = block }
    @resolved_marshallers = @marshallers.dup
end

#allow_classes(*classes) ⇒ Object



203
204
205
# File 'lib/roby/interface/v2/channel.rb', line 203

def allow_classes(*classes)
    add_marshaller(*classes) { _2 }
end

#allow_objects(*objects) ⇒ Object



222
223
224
# File 'lib/roby/interface/v2/channel.rb', line 222

def allow_objects(*objects)
    @allowed_objects.merge(objects)
end

#allowed_object?(object) ⇒ Boolean

Returns:

  • (Boolean)


226
227
228
# File 'lib/roby/interface/v2/channel.rb', line 226

def allowed_object?(object)
    @allowed_objects.include?(object)
end

#client?Boolean

Returns true if the local process is the client or the server.

Returns:

  • (Boolean)

    true if the local process is the client or the server



14
# File 'lib/roby/interface/v2/channel.rb', line 14

attr_predicate :client?

#closeObject



80
81
82
# File 'lib/roby/interface/v2/channel.rb', line 80

def close
    io.close
end

#closed?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/roby/interface/v2/channel.rb', line 84

def closed?
    io.closed?
end

#eof?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/roby/interface/v2/channel.rb', line 88

def eof?
    io.eof?
end

#find_marshaller(object) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
# File 'lib/roby/interface/v2/channel.rb', line 266

def find_marshaller(object)
    if (block = @resolved_marshallers[object.class])
        return block
    end

    _, block =
        @marshallers
        .find_all { |klass, _| object.kind_of?(klass) }
        .min_by { _1 }
    @resolved_marshallers[object.class] = block
end

#flushObject



92
93
94
# File 'lib/roby/interface/v2/channel.rb', line 92

def flush
    io.flush
end

#guard_buffer_sizeObject

Raises:



330
331
332
333
334
335
336
337
# File 'lib/roby/interface/v2/channel.rb', line 330

def guard_buffer_size
    return if @write_buffer.size <= max_write_buffer_size

    raise ComError,
          "channel reached an internal buffer size of "\
          "#{@write_buffer.size}, which is bigger than the limit "\
          "of #{max_write_buffer_size}, bailing out"
end

#guard_read_threadObject

Raises:



321
322
323
324
325
326
327
328
# File 'lib/roby/interface/v2/channel.rb', line 321

def guard_read_thread
    @read_thread ||= Thread.current
    return if @read_thread == Thread.current

    raise InternalError,
          "cross-thread access to channel while reading: "\
          "expected #{@read_thread} but got #{Thread.current}"
end

#guard_write_threadObject

Raises:



312
313
314
315
316
317
318
319
# File 'lib/roby/interface/v2/channel.rb', line 312

def guard_write_thread
    @write_thread ||= Thread.current
    return if @write_thread == Thread.current

    raise InternalError,
          "cross-thread access to channel while writing: "\
          "expected #{@write_thread} to #{Thread.current}"
end

#marshal_basic_object(object) ⇒ Object

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/roby/interface/v2/channel.rb', line 245

def marshal_basic_object(object) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
    case object
    when Array
        object.map { marshal_filter_object(_1) }
    when Set
        object.each_with_object(Set.new) do
            _2 << marshal_filter_object(_1)
        end
    when Hash
        object.transform_values { marshal_filter_object(_1) }
    when Struct
        object = object.dup
        object.each_pair { object[_1] = marshal_filter_object(_2) }
        object
    when *ALLOWED_BASIC_TYPES
        object
    else
        None
    end
end

#marshal_filter_object(object) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/roby/interface/v2/channel.rb', line 230

def marshal_filter_object(object)
    marshalled = marshal_basic_object(object)
    return marshalled if marshalled != None

    return object if allowed_object?(object)

    if (marshaller = find_marshaller(object))
        return marshaller[self, object]
    end

    message = "object '#{object}' of class #{object.class} "\
              "not allowed on this interface"
    report_error(message)
end

#marshal_object(object) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/roby/interface/v2/channel.rb', line 166

def marshal_object(object)
    object = marshal_filter_object(object)
    Marshal.dump(object)
rescue TypeError => e
    invalid = self.class.find_invalid_marshalling_object(object)
    message = "failed to marshal #{invalid} of class "\
              "#{invalid.class} in #{object}: #{e.message}"
    Marshal.dump report_error(message)
rescue RuntimeError => e
    message = "failed to marshal #{object}: #{e.message}"
    Marshal.dump report_error(message)
end

#push_write_data(new_bytes = nil) ⇒ Boolean

Push queued data

The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.

Returns:

  • (Boolean)

    true if there is still data left in the buffe, false otherwise



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/roby/interface/v2/channel.rb', line 290

def push_write_data(new_bytes = nil)
    guard_write_thread

    @write_buffer.concat(new_bytes) if new_bytes
    written_bytes = io.write_nonblock(@write_buffer)
    @stats.tx += written_bytes

    @write_buffer = @write_buffer[written_bytes..-1]
    !@write_buffer.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN
    guard_buffer_size
rescue SystemCallError, IOError
    raise ComError, "broken communication channel"
rescue RuntimeError => e
    # Workaround what seems to be a Ruby bug ...
    if e.message =~ /can.t modify frozen IOError/
        raise ComError, "broken communication channel"
    end

    raise
end

#read_data_from_io(remaining_time) ⇒ Object



140
141
142
143
144
145
146
# File 'lib/roby/interface/v2/channel.rb', line 140

def read_data_from_io(remaining_time)
    return unless IO.select([@io], [], [], remaining_time)

    @incoming << @read_buffer if io.sysread(1024**2, @read_buffer)
    @stats.rx += @read_buffer.size
rescue Errno::EWOULDBLOCK, Errno::EAGAIN # rubocop:disable Lint/SuppressedException
end

#read_packet(timeout = 0) ⇒ Object?

Read one packet from #io and unmarshal it

Returns:

  • (Object, nil)

    returns the unmarshalled object, or nil if no full object can be found in the data received so far



110
111
112
113
114
115
116
117
118
# File 'lib/roby/interface/v2/channel.rb', line 110

def read_packet(timeout = 0)
    guard_read_thread

    if (packet = @incoming.next)
        return unmarshal_packet(packet)
    end

    read_packet_from_io(timeout)
end

#read_packet_from_io(timeout) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/roby/interface/v2/channel.rb', line 120

def read_packet_from_io(timeout)
    deadline       = Time.now + timeout if timeout
    remaining_time = timeout

    loop do
        read_data_from_io(remaining_time)

        if (packet = @incoming.next)
            return unmarshal_packet(packet)
        end

        if deadline
            remaining_time = deadline - Time.now
            return if remaining_time < 0
        end
    end
rescue SystemCallError, IOError
    raise ComError, "closed communication"
end

#read_wait(timeout: nil) ⇒ Boolean

Wait until there is something to read on the channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



102
103
104
# File 'lib/roby/interface/v2/channel.rb', line 102

def read_wait(timeout: nil)
    IO.select([io], [], [], timeout)
end

#report_error(message) ⇒ Object



179
180
181
182
183
184
# File 'lib/roby/interface/v2/channel.rb', line 179

def report_error(message)
    Roby::Interface.warn message
    caller.each { Roby::Interface.warn("  #{_1}") }

    Protocol::Error.new(message: message, backtrace: [])
end

#reset_thread_guard(read_thread = nil, write_thread = nil) ⇒ Object



278
279
280
281
# File 'lib/roby/interface/v2/channel.rb', line 278

def reset_thread_guard(read_thread = nil, write_thread = nil)
    @write_thread = read_thread
    @read_thread = write_thread
end

#to_ioObject



76
77
78
# File 'lib/roby/interface/v2/channel.rb', line 76

def to_io
    io.to_io
end

#unmarshal_packet(packet) ⇒ Object



148
149
150
151
152
153
# File 'lib/roby/interface/v2/channel.rb', line 148

def unmarshal_packet(packet)
    Marshal.load(packet.to_s) # rubocop:disable Security/MarshalLoad
rescue TypeError => e
    raise ProtocolError,
          "failed to unmarshal received packet: #{e.message}"
end

#write_buffer_sizeObject



72
73
74
# File 'lib/roby/interface/v2/channel.rb', line 72

def write_buffer_size
    @write_buffer.size
end

#write_packet(object) ⇒ void

This method returns an undefined value.

Write one ruby object (usually an array) as a marshalled packet and send it to #io

Parameters:

  • object (Object)

    the object to be sent



160
161
162
163
164
# File 'lib/roby/interface/v2/channel.rb', line 160

def write_packet(object)
    marshalled = marshal_object(object)
    packet = @websocket_packet.new(data: marshalled, type: :binary)
    push_write_data(packet.to_s)
end