Class: Roby::Interface::DRobyChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/droby_channel.rb

Overview

A wrapper on top of raw IO that uses droby marshalling to communicate

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2) ⇒ DRobyChannel

Returns a new instance of DRobyChannel.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/roby/interface/droby_channel.rb', line 17

def initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2)
    @io = io
    @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @client = client

    @incoming =
        if client?
            WebSocket::Frame::Incoming::Client.new(type: :binary)
        else
            WebSocket::Frame::Incoming::Server.new(type: :binary)
        end
    @marshaller = marshaller
    @max_write_buffer_size = max_write_buffer_size
    @read_buffer  = String.new
    @write_buffer = String.new
    @write_thread = nil
end

Instance Attribute Details

#io#read_nonblock, #write (readonly)

Returns the channel that allows us to communicate to clients.

Returns:

  • (#read_nonblock, #write)

    the channel that allows us to communicate to clients



6
7
8
# File 'lib/roby/interface/droby_channel.rb', line 6

def io
  @io
end

#marshallerDRoby::Marshal (readonly)

Returns an object used to marshal or unmarshal objects to/from the connection.

Returns:

  • (DRoby::Marshal)

    an object used to marshal or unmarshal objects to/from the connection



12
13
14
# File 'lib/roby/interface/droby_channel.rb', line 12

def marshaller
  @marshaller
end

#max_write_buffer_sizeObject (readonly)

The maximum byte count that the channel can hold on the write side until it bails out



15
16
17
# File 'lib/roby/interface/droby_channel.rb', line 15

def max_write_buffer_size
  @max_write_buffer_size
end

Instance Method Details

#client?Boolean

Returns true if the local process is the client or the server.

Returns:

  • (Boolean)

    true if the local process is the client or the server



9
# File 'lib/roby/interface/droby_channel.rb', line 9

attr_predicate :client?

#closeObject



43
44
45
# File 'lib/roby/interface/droby_channel.rb', line 43

def close
    io.close
end

#closed?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/roby/interface/droby_channel.rb', line 47

def closed?
    io.closed?
end

#eof?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/roby/interface/droby_channel.rb', line 51

def eof?
    io.eof?
end

#flushObject



55
56
57
# File 'lib/roby/interface/droby_channel.rb', line 55

def flush
    io.flush
end

#push_write_data(new_bytes = nil) ⇒ Boolean

Push queued data

The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.

Returns:

  • (Boolean)

    true if there is still data left in the buffe, false otherwise



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/roby/interface/droby_channel.rb', line 147

def push_write_data(new_bytes = nil)
    @write_thread ||= Thread.current
    if @write_thread != Thread.current
        raise InternalError, "cross-thread access to droby channel: from #{@write_thread} to #{Thread.current}"
    end

    @write_buffer.concat(new_bytes) if new_bytes
    written_bytes = io.syswrite(@write_buffer)

    @write_buffer = @write_buffer[written_bytes..-1]
    !@write_buffer.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN
    if @write_buffer.size > max_write_buffer_size
        raise ComError, "droby_channel reached an internal buffer size of #{@write_buffer.size}, which is bigger than the limit of #{max_write_buffer_size}, bailing out"
    end
rescue SystemCallError, IOError, EOFError
    raise ComError, "broken communication channel"
rescue RuntimeError => e
    # Workaround what seems to be a Ruby bug ...
    if e.message =~ /can.t modify frozen IOError/
        raise ComError, "broken communication channel"
    else raise
    end
end

#read_packet(timeout = 0) ⇒ Object?

Read one packet from #io and unmarshal it

Returns:

  • (Object, nil)

    returns the unmarshalled object, or nil if no full object can be found in the data received so far



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/roby/interface/droby_channel.rb', line 73

def read_packet(timeout = 0)
    @read_thread ||= Thread.current
    if @read_thread != Thread.current
        raise InternalError, "cross-thread access to droby channel: from #{@read_thread} to #{Thread.current}"
    end

    deadline       = Time.now + timeout if timeout
    remaining_time = timeout

    if packet = @incoming.next
        return unmarshal_packet(packet)
    end

    while true
        if IO.select([io], [], [], remaining_time)
            begin
                if io.sysread(1024 ** 2, @read_buffer)
                    @incoming << @read_buffer
                end
            rescue Errno::EWOULDBLOCK, Errno::EAGAIN
            end
        end

        if packet = @incoming.next
            return unmarshal_packet(packet)
        end

        if deadline
            remaining_time = deadline - Time.now
            return if remaining_time < 0
        end
    end

rescue SystemCallError, EOFError, IOError
    raise ComError, "closed communication"
end

#read_wait(timeout: nil) ⇒ Boolean

Wait until there is something to read on the channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



65
66
67
# File 'lib/roby/interface/droby_channel.rb', line 65

def read_wait(timeout: nil)
    !!IO.select([io], [], [], timeout)
end

#reset_thread_guardObject



135
136
137
138
# File 'lib/roby/interface/droby_channel.rb', line 135

def reset_thread_guard
    @write_thread = nil
    @read_thread = nil
end

#to_ioObject



39
40
41
# File 'lib/roby/interface/droby_channel.rb', line 39

def to_io
    io.to_io
end

#unmarshal_packet(packet) ⇒ Object



110
111
112
113
114
115
116
# File 'lib/roby/interface/droby_channel.rb', line 110

def unmarshal_packet(packet)
    unmarshalled = begin Marshal.load(packet.to_s)
                   rescue TypeError => e
                       raise ProtocolError, "failed to unmarshal received packet: #{e.message}"
                   end
    marshaller.local_object(unmarshalled)
end

#write_buffer_sizeObject



35
36
37
# File 'lib/roby/interface/droby_channel.rb', line 35

def write_buffer_size
    @write_buffer.size
end

#write_packet(object) ⇒ void

This method returns an undefined value.

Write one ruby object (usually an array) as a marshalled packet and send it to #io

Parameters:

  • object (Object)

    the object to be sent



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/roby/interface/droby_channel.rb', line 123

def write_packet(object)
    marshalled = Marshal.dump(marshaller.dump(object))
    packet =
        if client?
            WebSocket::Frame::Outgoing::Client.new(data: marshalled, type: :binary)
        else
            WebSocket::Frame::Outgoing::Server.new(data: marshalled, type: :binary)
        end

    push_write_data(packet.to_s)
end